一、low版
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import queue 5 import time 6 class threadpool: 7 def __init__(self,max_num=20): 8 self.queue = queue.Queue(max_num) 9 for i in range(max_num):10 self.queue.put(threading.Thread)11 def get_thread(self):12 return self.queue.get()13 def add_thread(self):14 self.queue.put(threading.Thread)15 16 def fun(pool,a1):17 time.sleep(1)18 print(a1)19 pool.add_thread()20 p = threadpool()21 for i in range(50):22 ret = p.get_thread()23 t= ret(target= fun,args=(p,i,))24 t.start()
二、版本二
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue 5 import threading 6 import contextlib 7 import time 8 9 StopEvent = object() 10 11 12 class ThreadPool(object): 13 14 def __init__(self, max_num, max_task_num = None): 15 if max_task_num: 16 self.q = queue.Queue(max_task_num) 17 else: 18 self.q = queue.Queue() 19 self.max_num = max_num 20 self.cancel = False 21 self.terminal = False 22 self.generate_list = [] 23 self.free_list = [] 24 25 def run(self, func, args, callback=None): 26 """ 27 线程池执行一个任务 28 :param func: 任务函数 29 :param args: 任务函数所需参数 30 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 31 :return: 如果线程池已经终止,则返回True否则None 32 """ 33 if self.cancel: 34 return 35 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36 self.generate_thread() 37 w = (func, args, callback,) 38 self.q.put(w) 39 40 def generate_thread(self): 41 """ 42 创建一个线程 43 """ 44 t = threading.Thread(target=self.call) 45 t.start() 46 47 def call(self): 48 """ 49 循环去获取任务函数并执行任务函数 50 """ 51 current_thread = threading.currentThread() 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() 55 while event != StopEvent: 56 57 func, arguments, callback = event 58 try: 59 result = func(*arguments) 60 success = True 61 except Exception as e: 62 success = False 63 result = None 64 65 if callback is not None: 66 try: 67 callback(success, result) 68 except Exception as e: 69 pass 70 71 with self.worker_state(self.free_list, current_thread): 72 if self.terminal: 73 event = StopEvent 74 else: 75 event = self.q.get() 76 else: 77 78 self.generate_list.remove(current_thread) 79 80 def close(self): 81 """ 82 执行完所有的任务后,所有线程停止 83 """ 84 self.cancel = True 85 full_size = len(self.generate_list) 86 while full_size: 87 self.q.put(StopEvent) 88 full_size -= 1 89 90 def terminate(self): 91 """ 92 无论是否还有任务,终止线程 93 """ 94 self.terminal = True 95 96 while self.generate_list: 97 self.q.put(StopEvent) 98 99 self.q.queue.clear()100 101 @contextlib.contextmanager102 def worker_state(self, state_list, worker_thread):103 """104 用于记录线程中正在等待的线程数105 """106 state_list.append(worker_thread)107 try:108 yield109 finally:110 state_list.remove(worker_thread)111 112 113 114 # How to use115 116 117 pool = ThreadPool(5)118 119 def callback(status, result):120 # status, execute action status121 # result, execute action return value122 pass123 124 125 def action(i):126 print(i)127 128 for i in range(30):129 ret = pool.run(action, (i,), callback)130 131 time.sleep(5)132 print(len(pool.generate_list), len(pool.free_list))133 print(len(pool.generate_list), len(pool.free_list))134 # pool.close()135 # pool.terminate()