博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
多线程池
阅读量:5921 次
发布时间:2019-06-19

本文共 4208 字,大约阅读时间需要 14 分钟。

一、low版

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import threading 4 import queue 5 import time 6 class threadpool: 7     def __init__(self,max_num=20): 8         self.queue = queue.Queue(max_num) 9         for i in range(max_num):10             self.queue.put(threading.Thread)11     def get_thread(self):12         return self.queue.get()13     def add_thread(self):14         self.queue.put(threading.Thread)15 16 def fun(pool,a1):17     time.sleep(1)18     print(a1)19     pool.add_thread()20 p = threadpool()21 for i in range(50):22     ret = p.get_thread()23     t= ret(target= fun,args=(p,i,))24     t.start()

 二、版本二

1 #!/usr/bin/env python  2 # -*- coding:utf-8 -*-  3   4 import queue  5 import threading  6 import contextlib  7 import time  8   9 StopEvent = object() 10  11  12 class ThreadPool(object): 13  14     def __init__(self, max_num, max_task_num = None): 15         if max_task_num: 16             self.q = queue.Queue(max_task_num) 17         else: 18             self.q = queue.Queue() 19         self.max_num = max_num 20         self.cancel = False 21         self.terminal = False 22         self.generate_list = [] 23         self.free_list = [] 24  25     def run(self, func, args, callback=None): 26         """ 27         线程池执行一个任务 28         :param func: 任务函数 29         :param args: 任务函数所需参数 30         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 31         :return: 如果线程池已经终止,则返回True否则None 32         """ 33         if self.cancel: 34             return 35         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36             self.generate_thread() 37         w = (func, args, callback,) 38         self.q.put(w) 39  40     def generate_thread(self): 41         """ 42         创建一个线程 43         """ 44         t = threading.Thread(target=self.call) 45         t.start() 46  47     def call(self): 48         """ 49         循环去获取任务函数并执行任务函数 50         """ 51         current_thread = threading.currentThread() 52         self.generate_list.append(current_thread) 53  54         event = self.q.get() 55         while event != StopEvent: 56  57             func, arguments, callback = event 58             try: 59                 result = func(*arguments) 60                 success = True 61             except Exception as e: 62                 success = False 63                 result = None 64  65             if callback is not None: 66                 try: 67                     callback(success, result) 68                 except Exception as e: 69                     pass 70  71             with self.worker_state(self.free_list, current_thread): 72                 if self.terminal: 73                     event = StopEvent 74                 else: 75                     event = self.q.get() 76         else: 77  78             self.generate_list.remove(current_thread) 79  80     def close(self): 81         """ 82         执行完所有的任务后,所有线程停止 83         """ 84         self.cancel = True 85         full_size = len(self.generate_list) 86         while full_size: 87             self.q.put(StopEvent) 88             full_size -= 1 89  90     def terminate(self): 91         """ 92         无论是否还有任务,终止线程 93         """ 94         self.terminal = True 95  96         while self.generate_list: 97             self.q.put(StopEvent) 98  99         self.q.queue.clear()100 101     @contextlib.contextmanager102     def worker_state(self, state_list, worker_thread):103         """104         用于记录线程中正在等待的线程数105         """106         state_list.append(worker_thread)107         try:108             yield109         finally:110             state_list.remove(worker_thread)111 112 113 114 # How to use115 116 117 pool = ThreadPool(5)118 119 def callback(status, result):120     # status, execute action status121     # result, execute action return value122     pass123 124 125 def action(i):126     print(i)127 128 for i in range(30):129     ret = pool.run(action, (i,), callback)130 131 time.sleep(5)132 print(len(pool.generate_list), len(pool.free_list))133 print(len(pool.generate_list), len(pool.free_list))134 # pool.close()135 # pool.terminate()

 

转载于:https://www.cnblogs.com/Erick-L/p/6502576.html

你可能感兴趣的文章
JPA
查看>>
LocalStorage存储JSON对象的问题
查看>>
PHP数组的定义
查看>>
Java进阶篇设计模式之九----- 解释器模式和迭代器模式
查看>>
UOJ#54 BZOJ3434 [WC2014]时空穿梭
查看>>
48. Rotate Image
查看>>
SPOJ GSS1 & GSS3&挂了的GSS5
查看>>
linux下安装jdk,tomcat,maven
查看>>
selenium - 键盘操作
查看>>
P1010 幂次方
查看>>
AngularJS7那些不得不说的事故
查看>>
使用原理视角看 Git
查看>>
避免头文件重复包含
查看>>
libgdx 3D 渲染优化
查看>>
Shell编程之文本处理
查看>>
洛谷P2178 品酒大会
查看>>
Spring操作mongo排序,限制查询记录数
查看>>
ES6学习入门
查看>>
教育博客有哪些作用?
查看>>
汇编语言之中断学习
查看>>