站内链接:
多进程 基本知识点 进程
, 一个正在执行程序的实例, 是 OS 资源分配的最小单位. 根据<现代操作系统>
的解释, 我们可以这样理解进程, CPU, 程序之间的关系(例子的整体逻辑:一个有好厨艺的计算机科学家正在为他的女儿做生日蛋糕):
CPU: 计算机科学家, 实现逻辑的真正主体
程序: 做蛋糕的食谱, 实现逻辑
输入数据: 做蛋糕的各种原材料
输出数据: 最终的生日蛋糕以及这过程中产生的任何半成品
进程: 厨师阅读食谱, 取来各种原材料, 以及进行烘焙的一系列动作的总和
一个进程就是某种类型的一个活动, 其有程序, 输入, 输出, 状态. 每一个进程都有一个虚拟的 CPU, 有用自己独立的虚拟地址空间. 那么 Python 中如何使用多进程呢?
python 语言不同于JAVA/C
语言, 由于 GIL(Global interpreter Lock ,全局解释锁)的存在, 程序只能实现并发(Concurrency), 并不能实现并行(Parallelism). 为了充分利用 CPU, 在并行计算场景中, 就必须使用多进程(当然, 由于 CPU 的进程切换消耗, 并非进程越多越好). python 中进程实现方式有以下几种:
使用 C 语言优化并行代码, 绕过 GIL 限制, 这个除非是要求高性能的程序, 一般不会用到
使用 multiprocessing 来实现多进程
使用 subprocess 搭配线程/协程
来进行系统调用, 实际上是os.system
等的替代方式
multiprocessing
基本用法, 传递参数, 主进程等待派生进程的结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import osfrom multiprocessing import Processdef func1 (arg1, arg2, arg3 ): """传入三个参数""" print ("args1:" , arg1) print ("args2:" , arg2) print ("args3:" , arg3) if __name__ == "__main__" : """main""" p2 = Process(target=func1, args=(3 , 4 , 5 )) p2.start() p2.join() print ("p2, end" )
使用可调用对象来传递参数来实现多进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 """ 测试多进程, 并加入可调用对象的测试 """ import multiprocessingimport timeclass CallableFucn (object ): """可调用对象""" def __init__ (self, func, args ): """init""" self._argsList = args self._func = func def __call__ (self ): """call""" self._func(*self._argsList) def counter (n, b ): """ 打印自身 """ print ('这是一个被可调用对象操作的子进程函数: {}' .format (n * b)) time.sleep(2 ) if __name__ == '__main__' : p1 = multiprocessing.Process(target=CallableFucn(counter, args=(3 , 4 ))) p1.start() for _ in range (10 ): print ('---主进程等待中---' ) p1.join() print ('===============' )
使用 multiprocesing 派生进程实现多进程, 这是比较常用的方式, 特别在线程池, 进程池中使用.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 """ 派生进程多进程测试 """ import multiprocessingimport timeclass MyProcess (multiprocessing.Process): def __init__ (self, name ): self.name = name super (MyProcess, self).__init__() def run (self ): """ 启动函数 """ print ('派生进程:{} 准备启动中' .format (self.name)) time.sleep(2 ) if __name__ == '__main__' : p = MyProcess('SubProcess' ) p.start() p.join() print ('=====================' )
利用队列, 实现多个进程之间的消息传递, 父进程向子进程推送消息, 子进程进行任务的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 """ 多进程利用队列来进行传递消息 """ import timeimport queueimport multiprocessingdef worker (q ): """ 消费者 """ name = multiprocessing.current_process().name for _ in range (3 ): try : message = q.get(timeout=2 ) if message: print ('子进程{}收到消息:{}' .format (name, message)) except queue.Empty: print ('-----------子进程结束-------------' ) break if __name__ == '__main__' : q = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(q,)) p.start() for i in range (3 ): q.put(i) q.close() q.join_thread() p.join() print ('==========================' )
共享资源同步写入锁操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 """ 同步锁操作 """ import timeimport sysimport multiprocessingstream = sys.stdout def lock_with_worker (lock ): """ 进程锁写数据 """ name = multiprocessing.current_process().name for _ in range (3 ): with lock: stream.write('proces:{} ready write message\r\n' .format (name)) stream.flush() time.sleep(0.5 ) def lock_nowith_worker (lock ): """ 进程2写数据 """ name = multiprocessing.current_process().name for _ in range (3 ): lock.acquire() try : stream.write('process:{} du du du du\r\n' .format (name)) stream.flush() finally : lock.release() time.sleep(0.3 ) if __name__ == '__main__' : lock = multiprocessing.Lock() p1 = multiprocessing.Process(name='with' , target=lock_with_worker, args=(lock,)) p2 = multiprocessing.Process(name='nowith' , target=lock_nowith_worker, args=(lock,)) p1.start() p2.start() p1.join() p2.join()
关于锁相关知识见锁分类介绍
进程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 """ 进程池操作 """ import timeimport sysimport multiprocessingdef start_func (): """ 仅仅对池中进程进行初始化操作, 并非真正的回调函数 """ name = multiprocessing.current_process().name print ('进程池初始函数, 名字:{}.....' .format (name)) def show_myself (data ): name = multiprocessing.current_process().name print ('{}: {}' .format (name, data)) if __name__ == '__main__' : pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size, initializer=start_func, maxtasksperchild=2 ) input_datas = ['aa' , '33' , 'bb' , 'dd' , '想' , '你妹' , '哇哈' , '30' , 'omg' ] for i in range (4 ): if i % 2 == 0 : pool_output = pool.map (show_myself, input_datas) else : pool_output = pool.apply(show_myself, [input_datas]) pool.close() pool.join()
关于上下文相关知识点见内核上下文术语 介绍. 上面的例子中我们看到进程池的迭代启动通过 map 方法, 下面简单介绍下迭代启动的几个常用方法:
apply(func [, args [, kwarsg]])
, 参数类似 javascript 中的 apply, 阻塞调用, 其更适合在多线程中使用, 否则可能看起来就像是没有并发一样
apply_async(func, args, kwargs, callback, error_callback)
, 非阻塞调用
map(func, iterable, chunksize)
, 内置 map 函数的类似逻辑, 阻塞调用, 其会迭代 iterable 每一个元素并逐一创建任务放到进程池中
map_async(func, iterable, chunksize, callback, error_callback)
, 非阻塞调用
最后, 在实际项目开发中, 如果使用了成熟的分布式或者其他框架时, 一般项目开发的时候很少会用到多进程, 此时就需要对框架有一定的了解以便进行深度开发, 或者碰到一些错误或者疑难问题时无从下手. 当然, 如果需要对自己的分布式项目完全可控并且还有足够的人力资源, 那么可以自己编写多进程框架来完全掌控项目, 此时就会用到多进程, 进程池的概念.
subprocess subprocess
用于创建和处理附加进程
, 用于替换os.system, popen
等老函数. 那么 subprocess 和 multiprocess 有什么区别呢? 实际上他们两者的定位有区别:
multiprocess: 通常意义上的多继承开发, 同一个代码中通过多进程调用其他模块实现并发操作
subprocess: 类似os.system
, 直接调用外部的二进制程序或者脚本, 其依托于外部的环境
实际项目运行过程中, subprocess 的使用频率一般比 multiprocess 高.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import subprocesssubprocess.call(['ls' , '-l' ]) subprocess.run(['ls' , '-l' ]) proc = subprocess.run(['uname' , '-r' ], stdout=su.PIPE) print (proc.returncode, proc.stdout)ls = sub.Popen(['ls' ], stdout=sub.PIPE) for f in ls.stdout: print (f) ex = sub.Popen(['ex' , 'test.txt' ], stdin=sub.PIPE) ex.stdin.write(b'i\nthis is a test\n.\n' )
相比于 call 和 run 方法, Popen 更加偏底层实现, 其提供了更多的功能, 使用起来可能更加复杂一点, 下面是 Popen 对象的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 proc.poll() proc.wait(timeout) proc.communicate(input , timeout) proc.send_signal(signal) proc.terminate() proc.kill()
实际例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import subprocessimport timesubp = subprocess.Popen('bash /shell/long_sleep.sh' , shell=True , stdout=subprocess.PIPE,stderr=subprocess.PIPE,encoding="utf-8" ) returncode = None while True : try : subp.wait(2 ) except Exception as msg: print (f'等待子进程处理超时:{msg} ' ) returncode = subp.pool() if returncode is None : print ('子进程仍然在处理中, 请等待...' ) continue break if returncode != 0 : print (f'子进程命令支持异常:{returncode} ' ) else : print (subp.communicate()[1 ]) subp.terminal()
线程 基本知识点 上一个章节已经讲述的进程的相关知识, 这里我们会深入进程, 讲述一个更加精细化的定义–线程. 线程
是 OS 能够进行运算调度的最小单位, 其包含在进程中, 条线程指的是进程中一个单一顺序的控制流. 实际上, 线程除了不持有唯一的虚拟地址空间, 文件描述符等资源之外, 其他都类似进程. 那么线程有什么用处呢? 其能够解决什么问题呢?
线程提供了程序并行处理
的逻辑, 而且这些并行处理共享相同的资源(这是和多进程的差别)
线程相比进程更加轻量级, 在一些系统中, 线程的创建要比进程的创建快10~100
倍
线程在 IO 密集型应用中能够极大的优化执行效率
线程在一些多 CPU 系统(特定语言)中会实现真正的并行
当然, 线程类似进程, 虽然他有各种好处, 但是并发编程实际上大大的提升了编程的难度, 这其中涉及的共享资源锁问题, 锁问题导致的性能问题, 死锁问题等等不仅仅在多进程编程中发生, 在多线程编程中也会发生.
应用
创建基本线程. 注意, 多线程的执行是不确定性的
, 这是线程的核心特征.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 """ 线程基本测试, 创建一个计数线程 """ import timeimport threadingclass CountDownTask (object ): def __init__ (self ): self._running = True def terminate (self ): self._running = False def run (self, n ): while self._running and n > 0 : print ('时间:' , n) n -= 1 time.sleep(1 ) if __name__ == '__main__' : c = CountDownTask() t = threading.Thread(target=c.run, args=(10 ,)) t.start() time.sleep(3 ) c.terminate() t.join() print ('=' * 10 )
有没有发现上面线程的用法同第一章节的多进程multiprocessing
的使用方法极其类似, 实际上多进程库就是参考threading
线程库接口而设计的, 两者中大部分函数和用法都类似.
守护线程. 注意, 守护线程
并非守护进程
, 其仅仅用于某些特定场景(例如垃圾收集器), 一个守护线程在创建并开始执行后, 不会干扰main thread
的结束, 而在所有非守护线程
结束后, 守护线程会自动销毁,所以, 守护线程必须处理好主动销毁后的操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 """ 守护线程测试 """ import timeimport datetimeimport threadingdef show_time (): """ 打印当前时间 """ name = threading.current_thread().name while True : current_time = str (datetime.datetime.now()) print ('{}, 当前时间:{}' .format (name, current_time)) time.sleep(1 ) def count_down (n ): name = threading.current_thread().name while n > 0 : print ('{}, 计数:{}' .format (name, n)) time.sleep(0.5 ) n -= 1 if __name__ == '__main__' : daemon_t = threading.Thread(target=show_time, name='daemon' , daemon=True ) daemon_t.start() c_t = threading.Thread(target=count_down, name='count' , args=(10 ,)) c_t.start() c_t.join()
Event
对象, 用于线程之间的消息通知或者共享资源的使用, 不过 Event 对象一般用于一次性
的事件. 创建一个事件, 在设置完相关标记之后, 一般就会被弃用了. 注意, Event
的通知会告知所有等待通知的所有线程.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 """ 测试threading.Event """ import threadingimport timeclass TestThread (threading.Thread): """ 多线程派生类实现, 多进程也有类似的逻辑: a. multiprocessing.Process b. run() """ def __init__ (self, name, event ): super (TestThread, self).__init__() self.name = name self.event = event def run (self ): print ('Thread: ' , self.name, ' start at:' , time.ctime(time.time())) self.event.wait() print ('Thread: ' , self.name, ' finish at:' , time.ctime(time.time())) if __name__ == '__main__' : event = threading.Event() threads = [] for i in range (1 , 3 ): threads.append(TestThread(str (i), event)) print ('main thread start at: ' , time.ctime(time.time())) event.clear() for thread in threads: thread.start() print ('sleep 5 seconds.......' ) time.sleep(5 ) print ('主线程准备设置标志位, 通知所有其他线程' ) event.set ()
前面提过Event
对象用于一次性
的事件通知, 如果线程希望重复的进行事件通知, 可以用condition
对象. Event
对象会通知所有等待线程, Condition
或者Semaphore
存在接口通知一个单独的等待线程.注意, Condition 也是一个互斥锁, 不管上面的 Event 对象, 还是 Condition 对象, wait
函数的实现机制都是类似的:
首先要求已经获得 Lock(前提条件)
wait 方法释放底层 Lock
wait 方法所处线程进入阻塞状态, 等待 notify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 """ Condition线程通知事件测试 """ import timeimport threadingclass ConditionTimer (object ): def __init__ (self, interval ): """ 一个定时器线程, 在超时后发送信号, 唤醒所有等待该条件的线程. """ self._interval = interval self._flag = 0 self._cv = threading.Condition() def start (self ): """ 确保定时器以后台线程方式运行 """ t = threading.Thread(target=self.run) t.daemon = True t.start() def run (self ): while True : time.sleep(self._interval) with self._cv: self._flag ^= 1 self._cv.notify_all() def wait_for_tick (self ): """ 等待定时器通知 """ with self._cv: last_flag = self._flag while last_flag == self._flag: self._cv.wait() def count_down (ptimer, nticks ): """ 滴答递减操作 """ while nticks > 0 : ptimer.wait_for_tick() print ('Count Down 滴答:' , nticks) nticks -= 1 def count_up (ptimer, nticks ): """ 滴答递增操作 """ n = 0 while n < nticks: ptimer.wait_for_tick() print ('Count up 加加:' , n) n += 1 if __name__ == '__main__' : ptimer = ConditionTimer(1 ) ptimer.start() p1 = threading.Thread(target=count_down, args=(ptimer,10 )) p1.start() p2 = threading.Thread(target=count_up, args=(ptimer,5 )) p2.start() p1.join() p2.join() print ('**' * 5 )
线程之间除了进行信号量传递之外, 还需要进行安全的通信或者消息传递, 否则直接访问共享内存必定会碰到资源竞争问题. python 中一般使用queue
模块来安全的实现消息传递.
1 2 3 4 5 6 import queuein_q = out_q = queue.Queue() in_q.put('线程 1:将消息推入队列中' ) out_q.get('线程 2: 获取队列中的消息' )
线程池
无限循环分发任务的的线程池管理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 """ 线程池测试 """ import socketfrom concurrent.futures import ThreadPoolExecutordef echo_client (sock, client_addr ): print ('收到socket消息, 地址:' , client_addr) while True : msg = sock.recv(65536 ) if not msg: break sock.sendall(msg) sock.close() def echo_server (addr ): pool = ThreadPoolExecutor(128 ) sock = socket.socket(socket.AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5 ) while True : client_sock, client_addr = sock.accept() pool.submit(echo_client, client_sock, client_addr) if __name__ == '__main__' : echo_server(('' , 15000 ))
在正常项目开发中突然使用多线程以加速获取某一个资源流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 @Singleton class BambooPool : def __init__ (self ): self._maxnum = 0 self.pool = None self.tasks = queue.Queue() @lazyproperty def maxnum (self ): if self._maxnum > 0 : return self._maxnum self._maxnum = current_app.config.get('BAMBOO_POOL_MAX_NUM' , 16 ) return self._maxnum def create (self ): if self.pool: try : self.pool.shutdown() except : pass try : self.pool = ThreadPoolExecutor(max_workers=self.maxnum) except Exception as msg: current_app.logger.exception(f'bamboo pool创建发生异常:{msg} ' ) raise BambooStartNodeNotFound(f'发生未知异常:{msg} ' ) else : current_app.logger.info(f'bamboo pool创建成功' ) def submit_one_task (self, cb, *args, **kwargs ): if not self.pool: self.create() try : app = current_app._get_current_object() task = self.pool.submit(cb, app, *args, **kwargs) except Exception as msg: current_app.logger.exception(f'bamboo pool提交task异常:{msg} ' ) self.create() task = self.pool.submit(cb, *args, **kwargs) return task def submit_and_save_task (self, cb, *args, **kwargs ): task = self.submit_one_task(cb, *args, **kwargs) self.tasks.put(task) def async_wait_task (self, tasks=None , timeout=120 ): """ 等待任务结果处理完成 """ start = int (time.time()) if not tasks: tasks = self.tasks num, sleep_interval = 0 , 0.1 try : while True : time.sleep(sleep_interval) num += 1 if num * sleep_interval > timeout: current_app.logger.error(f'bamboo pool等待任务结果超时:{num} , 超时时间:{timeout} ' ) break _task = tasks.get(False ) if not _task: continue if _task.done(): continue tasks.put(_task) except queue.Empty: return True except Exception as msg: current_app.logger.exception(f'bamboo pool等待任务结果异常:{msg} ' ) return False return False def wait_special_tasks (self, tasks, timeout=120 ): num, sleep_interval = 0 , 0.5 if not tasks: return False try : while True : if not tasks: current_app.logger.info(f'bamboo pool所有任务均处理结束, 轮询等待结束' ) return True time.sleep(sleep_interval) num += 1 if num * sleep_interval > timeout: current_app.logger.error(f'bamboo pool等待任务结果超时:{num} , 超时时间:{timeout} ' ) break undo_tasks = [] for _task in tasks: if not _task: continue if not _task.done(): undo_tasks.append(_task) tasks = undo_tasks except Exception as msg: current_app.logger.exception(f'bamboo pool等待任务结果异常:{msg} ' ) return False return False async_tasks = [] drcc_pool = DrccPool() task = drcc_pool.submit_one_task(flows, id , nids, False , skids) async_tasks.append(task) drcc_pool.wait_special_tasks(async_tasks)
这里基于 Python 的 ThreadPoolExecutor 实现了线程池, 各个函数说明如下:
submit: 非阻塞提交线程到线程池中并返回任务句柄
done: 判断某一个任务是否结束
result: 获取任务的返回值
wait: 等待任务结束, 不过这个不够灵活
协程在 python 中使用 关于协程的相关知识点介绍见同步编程到异步编程的技术演进 第三章的介绍, 这里主要讲解下协程在 python 环境下的落地实现, 具体见python 协程 .
参考
书籍参考:
<现代操作系统>
<python cookbook>
<python标准库>