站内链接:

多进程

基本知识点

进程, 一个正在执行程序的实例, 是 OS 资源分配的最小单位. 根据<现代操作系统>的解释, 我们可以这样理解进程, CPU, 程序之间的关系(例子的整体逻辑:一个有好厨艺的计算机科学家正在为他的女儿做生日蛋糕):

  • CPU: 计算机科学家, 实现逻辑的真正主体
  • 程序: 做蛋糕的食谱, 实现逻辑
  • 输入数据: 做蛋糕的各种原材料
  • 输出数据: 最终的生日蛋糕以及这过程中产生的任何半成品
  • 进程: 厨师阅读食谱, 取来各种原材料, 以及进行烘焙的一系列动作的总和

一个进程就是某种类型的一个活动, 其有程序, 输入, 输出, 状态. 每一个进程都有一个虚拟的 CPU, 有用自己独立的虚拟地址空间. 那么 Python 中如何使用多进程呢?

python 语言不同于JAVA/C语言, 由于 GIL(Global interpreter Lock ,全局解释锁)的存在, 程序只能实现并发(Concurrency), 并不能实现并行(Parallelism). 为了充分利用 CPU, 在并行计算场景中, 就必须使用多进程(当然, 由于 CPU 的进程切换消耗, 并非进程越多越好). python 中进程实现方式有以下几种:

  1. 使用 C 语言优化并行代码, 绕过 GIL 限制, 这个除非是要求高性能的程序, 一般不会用到
  2. 使用 multiprocessing 来实现多进程
  3. 使用 subprocess 搭配线程/协程来进行系统调用, 实际上是os.system等的替代方式

multiprocessing

  1. 基本用法, 传递参数, 主进程等待派生进程的结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding:utf-8
import os

from multiprocessing import Process


def func1(arg1, arg2, arg3):
"""传入三个参数"""
print("args1:", arg1)
print("args2:", arg2)
print("args3:", arg3)


if __name__ == "__main__":
"""main"""
p2 = Process(target=func1, args=(3, 4, 5))
p2.start()
p2.join()
print("p2, end")
  1. 使用可调用对象来传递参数来实现多进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# coding:utf8
""" 测试多进程, 并加入可调用对象的测试 """
import multiprocessing
import time


class CallableFucn(object):
"""可调用对象"""
def __init__(self, func, args):
"""init"""
self._argsList = args
self._func = func

def __call__(self):
"""call"""
# a. 做一些前置操作
# b. 真正调用
self._func(*self._argsList)
# c. 后置操作


def counter(n, b):
""" 打印自身 """
print('这是一个被可调用对象操作的子进程函数: {}'.format(n * b))
time.sleep(2)


if __name__ == '__main__':
# 1. 此时传递参数通过可调用对象来操作
p1 = multiprocessing.Process(target=CallableFucn(counter, args=(3, 4)))
p1.start()
for _ in range(10):
print('---主进程等待中---')
p1.join()
print('===============')
  1. 使用 multiprocesing 派生进程实现多进程, 这是比较常用的方式, 特别在线程池, 进程池中使用.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
""" 派生进程多进程测试 """
import multiprocessing
import time


class MyProcess(multiprocessing.Process):
def __init__(self, name):
self.name = name
super(MyProcess, self).__init__()

def run(self):
""" 启动函数 """
print('派生进程:{} 准备启动中'.format(self.name))
time.sleep(2)


if __name__ == '__main__':
p = MyProcess('SubProcess')
p.start()
p.join()
print('=====================')
  1. 利用队列, 实现多个进程之间的消息传递, 父进程向子进程推送消息, 子进程进行任务的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
""" 多进程利用队列来进行传递消息 """
import time
import queue
import multiprocessing


def worker(q):
""" 消费者 """
name = multiprocessing.current_process().name # 进程ID号
for _ in range(3):
try:
message = q.get(timeout=2)
if message:
print('子进程{}收到消息:{}'.format(name, message))
except queue.Empty:
print('-----------子进程结束-------------')
break


if __name__ == '__main__':
# 1. 消息队列
q = multiprocessing.Queue()
# 2. 启动一个子进程, 注意这不是进程池(使用Pool创建)
p = multiprocessing.Process(target=worker, args=(q,))
p.start()

# 生产者
for i in range(3):
q.put(i)
# 等待消费者
q.close()
q.join_thread()
p.join()
print('==========================')
  1. 共享资源同步写入锁操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
""" 同步锁操作 """
import time
import sys
import multiprocessing


stream = sys.stdout


def lock_with_worker(lock):
""" 进程锁写数据 """
name = multiprocessing.current_process().name
for _ in range(3):
with lock:
stream.write('proces:{} ready write message\r\n'.format(name))
stream.flush()
time.sleep(0.5)


def lock_nowith_worker(lock):
""" 进程2写数据 """
name = multiprocessing.current_process().name
for _ in range(3):
lock.acquire()
try:
stream.write('process:{} du du du du\r\n'.format(name))
stream.flush()
finally:
lock.release() # 释放锁
time.sleep(0.3)


if __name__ == '__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(name='with', target=lock_with_worker, args=(lock,))
p2 = multiprocessing.Process(name='nowith', target=lock_nowith_worker, args=(lock,))
p1.start()
p2.start()

p1.join()
p2.join()

关于锁相关知识见锁分类介绍

  1. 进程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
""" 进程池操作 """
import time
import sys
import multiprocessing


def start_func():
""" 仅仅对池中进程进行初始化操作, 并非真正的回调函数 """
name = multiprocessing.current_process().name # 这里实际为进程表示ID(整型), 非PID
print('进程池初始函数, 名字:{}.....'.format(name))


def show_myself(data):
name = multiprocessing.current_process().name
print('{}: {}'.format(name, data))


if __name__ == '__main__':
# 1. 创建进程池
pool_size = multiprocessing.cpu_count() * 2
# processes: 工作进程数, 若为None, 则默认为os.cpu_count()
# initializer: 进程初始化
# maxtasksperchild: 工作进程从启动到结束可以处理的任务数, 若为None则一直存在
# context: 工作进程启动时的上下文
pool = multiprocessing.Pool(processes=pool_size, initializer=start_func, maxtasksperchild=2)

# 2. 获取准备传入多个进程的各个数据
input_datas = ['aa', '33', 'bb', 'dd', '想', '你妹', '哇哈', '30', 'omg']

# 3. 迭代启动
for i in range(4):
if i % 2 == 0:
pool_output = pool.map(show_myself, input_datas) # 这里实际上发送了多个单独任务到进程池中
else:
pool_output = pool.apply(show_myself, [input_datas])

# 4. 关闭
pool.close() # 防止任何更多的任务被提交到池中, 关闭进程池的新建操作, 一旦完成所有任务直接退出
pool.join() # 等待工作进程退出, 需要在close或者terminal(立即停止工作进程)之前调用

关于上下文相关知识点见内核上下文术语介绍. 上面的例子中我们看到进程池的迭代启动通过 map 方法, 下面简单介绍下迭代启动的几个常用方法:

  • apply(func [, args [, kwarsg]]), 参数类似 javascript 中的 apply, 阻塞调用, 其更适合在多线程中使用, 否则可能看起来就像是没有并发一样
  • apply_async(func, args, kwargs, callback, error_callback), 非阻塞调用
  • map(func, iterable, chunksize), 内置 map 函数的类似逻辑, 阻塞调用, 其会迭代 iterable 每一个元素并逐一创建任务放到进程池中
  • map_async(func, iterable, chunksize, callback, error_callback), 非阻塞调用

最后, 在实际项目开发中, 如果使用了成熟的分布式或者其他框架时, 一般项目开发的时候很少会用到多进程, 此时就需要对框架有一定的了解以便进行深度开发, 或者碰到一些错误或者疑难问题时无从下手. 当然, 如果需要对自己的分布式项目完全可控并且还有足够的人力资源, 那么可以自己编写多进程框架来完全掌控项目, 此时就会用到多进程, 进程池的概念.

subprocess

subprocess用于创建和处理附加进程, 用于替换os.system, popen等老函数. 那么 subprocess 和 multiprocess 有什么区别呢? 实际上他们两者的定位有区别:

  • multiprocess: 通常意义上的多继承开发, 同一个代码中通过多进程调用其他模块实现并发操作
  • subprocess: 类似os.system, 直接调用外部的二进制程序或者脚本, 其依托于外部的环境

实际项目运行过程中, subprocess 的使用频率一般比 multiprocess 高.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import subprocess

# 1. call(args, stdin=None, stdout=None, stderr=None, shell=False): 阻塞执行子进程直到子调用返回
subprocess.call(['ls', '-l'])

# 2. run(args, stdin=None, stdout=None, stderr=None, capture_output=False, shell=False): 这是call方法的替代方法, 建议使用run进行子进程调用
subprocess.run(['ls', '-l'])
proc = subprocess.run(['uname', '-r'], stdout=su.PIPE)
print(proc.returncode, proc.stdout)

# 3. Popen(args, bugsize=-1), 更底层调用函数, 其更加的灵活和便利, 在一个新的进程中执行子程序: os.execvpe(). 注意, Popen调用的时候不会阻塞而是返回一个handler或者指针
# --> 输出
ls = sub.Popen(['ls'], stdout=sub.PIPE)
for f in ls.stdout:
print(f)
# --> 输入
ex = sub.Popen(['ex', 'test.txt'], stdin=sub.PIPE)
ex.stdin.write(b'i\nthis is a test\n.\n')

相比于 call 和 run 方法, Popen 更加偏底层实现, 其提供了更多的功能, 使用起来可能更加复杂一点, 下面是 Popen 对象的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# a. 检查进程是否终止,如果终止返回returncode,否则返回None
proc.poll()

# b. 阻塞等待子进程终止
proc.wait(timeout)

# c. 阻塞并和子进程进行stdin/stdout交互
proc.communicate(input, timeout)

# d. 向子进程发送信号
proc.send_signal(signal)

# e. 停止子进程: 发送SIGTERM
proc.terminate()

# f. 强制停止子进程: 发送SIGKILL
proc.kill()

实际例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import subprocess
import time

subp = subprocess.Popen('bash /shell/long_sleep.sh', shell=True, stdout=subprocess.PIPE,stderr=subprocess.PIPE,encoding="utf-8")
returncode = None
while True:
try:
subp.wait(2)
except Exception as msg:
print(f'等待子进程处理超时:{msg}')
returncode = subp.pool()
if returncode is None:
print('子进程仍然在处理中, 请等待...')
continue
break

if returncode != 0:
print(f'子进程命令支持异常:{returncode}')
else:
print(subp.communicate()[1])

subp.terminal()

线程

基本知识点

上一个章节已经讲述的进程的相关知识, 这里我们会深入进程, 讲述一个更加精细化的定义–线程. 线程是 OS 能够进行运算调度的最小单位, 其包含在进程中, 条线程指的是进程中一个单一顺序的控制流. 实际上, 线程除了不持有唯一的虚拟地址空间, 文件描述符等资源之外, 其他都类似进程. 那么线程有什么用处呢? 其能够解决什么问题呢?

  • 线程提供了程序并行处理的逻辑, 而且这些并行处理共享相同的资源(这是和多进程的差别)
  • 线程相比进程更加轻量级, 在一些系统中, 线程的创建要比进程的创建快10~100
  • 线程在 IO 密集型应用中能够极大的优化执行效率
  • 线程在一些多 CPU 系统(特定语言)中会实现真正的并行

当然, 线程类似进程, 虽然他有各种好处, 但是并发编程实际上大大的提升了编程的难度, 这其中涉及的共享资源锁问题, 锁问题导致的性能问题, 死锁问题等等不仅仅在多进程编程中发生, 在多线程编程中也会发生.

应用

  1. 创建基本线程. 注意, 多线程的执行是不确定性的, 这是线程的核心特征.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
""" 线程基本测试, 创建一个计数线程 """
import time
import threading


class CountDownTask(object):
def __init__(self):
self._running = True

def terminate(self):
self._running = False

def run(self, n):
while self._running and n > 0:
print('时间:', n)
n -= 1
time.sleep(1)


if __name__ == '__main__':
c = CountDownTask()
# 1. 多进程: multiprocessing.Process(target=func1, args=())
# 2. 多线程: threading.Thread(target=func1, args=())
t = threading.Thread(target=c.run, args=(10,))
t.start()
time.sleep(3)
c.terminate() # 发送SIGTERM信号
t.join()
print('=' * 10)

有没有发现上面线程的用法同第一章节的多进程multiprocessing的使用方法极其类似, 实际上多进程库就是参考threading线程库接口而设计的, 两者中大部分函数和用法都类似.

  1. 守护线程. 注意, 守护线程并非守护进程, 其仅仅用于某些特定场景(例如垃圾收集器), 一个守护线程在创建并开始执行后, 不会干扰main thread的结束, 而在所有非守护线程结束后, 守护线程会自动销毁,所以, 守护线程必须处理好主动销毁后的操作.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
""" 守护线程测试 """
import time
import datetime
import threading


def show_time():
""" 打印当前时间 """
name = threading.current_thread().name

# 在python主线程退出的时候会强制销毁
while True:
current_time = str(datetime.datetime.now())
print('{}, 当前时间:{}'.format(name, current_time))
time.sleep(1)


def count_down(n):
name = threading.current_thread().name
while n > 0:
print('{}, 计数:{}'.format(name, n))
time.sleep(0.5)
n -= 1


if __name__ == '__main__':
daemon_t = threading.Thread(target=show_time, name='daemon', daemon=True)
daemon_t.start()

c_t = threading.Thread(target=count_down, name='count', args=(10,))
c_t.start()
c_t.join() # 不需要等待守护线程
  1. Event对象, 用于线程之间的消息通知或者共享资源的使用, 不过 Event 对象一般用于一次性的事件. 创建一个事件, 在设置完相关标记之后, 一般就会被弃用了. 注意, Event的通知会告知所有等待通知的所有线程.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# coding:utf8
""" 测试threading.Event """
import threading
import time


class TestThread(threading.Thread):
""" 多线程派生类实现, 多进程也有类似的逻辑:
a. multiprocessing.Process
b. run()
"""
def __init__(self, name, event):
super(TestThread, self).__init__()
self.name = name
self.event = event

def run(self):
print('Thread: ', self.name, ' start at:', time.ctime(time.time()))
self.event.wait() # 等待事件被设置信号, 实际上也可以主线程等待其他线程设置标志位
print('Thread: ', self.name, ' finish at:', time.ctime(time.time()))


if __name__ == '__main__':
event = threading.Event()
threads = []
for i in range(1, 3):
threads.append(TestThread(str(i), event))

print('main thread start at: ', time.ctime(time.time()))
event.clear() # 在启动所有线程之前先置空标志位
for thread in threads:
thread.start()

print('sleep 5 seconds.......')
time.sleep(5)
print('主线程准备设置标志位, 通知所有其他线程')
event.set()
  1. 前面提过Event对象用于一次性的事件通知, 如果线程希望重复的进行事件通知, 可以用condition对象. Event对象会通知所有等待线程, Condition或者Semaphore存在接口通知一个单独的等待线程.注意, Condition 也是一个互斥锁, 不管上面的 Event 对象, 还是 Condition 对象, wait函数的实现机制都是类似的:
  • 首先要求已经获得 Lock(前提条件)
  • wait 方法释放底层 Lock
  • wait 方法所处线程进入阻塞状态, 等待 notify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# coding:utf8
""" Condition线程通知事件测试 """
import time
import threading


class ConditionTimer(object):
def __init__(self, interval):
""" 一个定时器线程, 在超时后发送信号, 唤醒所有等待该条件的线程. """
self._interval = interval
self._flag = 0
self._cv = threading.Condition()

def start(self):
""" 确保定时器以后台线程方式运行 """
t = threading.Thread(target=self.run)
t.daemon = True
t.start()

def run(self):
# 定时器, 无限循环
while True:
time.sleep(self._interval)
with self._cv: # 语法糖, 获取锁
self._flag ^= 1 # 或非操作, 5^2==7, 5^1==4, 3^1==2
self._cv.notify_all() # 通知

def wait_for_tick(self):
""" 等待定时器通知 """
with self._cv:
last_flag = self._flag
while last_flag == self._flag: # 或非操作, 实际上就是一个轮回
self._cv.wait() # 该函数底层会自动释放锁


def count_down(ptimer, nticks):
""" 滴答递减操作 """
while nticks > 0:
ptimer.wait_for_tick()
print('Count Down 滴答:', nticks)
nticks -= 1


def count_up(ptimer, nticks):
""" 滴答递增操作 """
n = 0
while n < nticks:
ptimer.wait_for_tick()
print('Count up 加加:', n)
n += 1


if __name__ == '__main__':
# 启动定时器
ptimer = ConditionTimer(1)
ptimer.start()
# 启动滴答
p1 = threading.Thread(target=count_down, args=(ptimer,10))
p1.start()
p2 = threading.Thread(target=count_up, args=(ptimer,5))
p2.start()
p1.join()
p2.join()
print('**' * 5)
  1. 线程之间除了进行信号量传递之外, 还需要进行安全的通信或者消息传递, 否则直接访问共享内存必定会碰到资源竞争问题. python 中一般使用queue模块来安全的实现消息传递.
1
2
3
4
5
6
import queue

in_q = out_q = queue.Queue()

in_q.put('线程 1:将消息推入队列中')
out_q.get('线程 2: 获取队列中的消息')

线程池

  1. 无限循环分发任务的的线程池管理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
""" 线程池测试 """
import socket

from concurrent.futures import ThreadPoolExecutor


def echo_client(sock, client_addr):
print('收到socket消息, 地址:', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
sock.close()


def echo_server(addr):
# 创建线程池
pool = ThreadPoolExecutor(128)
sock = socket.socket(socket.AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
# 启动
while True:
client_sock, client_addr = sock.accept()
# 放入池, 启动线程
pool.submit(echo_client, client_sock, client_addr)


if __name__ == '__main__':
# 启动服务器
echo_server(('', 15000))
  1. 在正常项目开发中突然使用多线程以加速获取某一个资源流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# 基于Queue的线程池处理类, 其中包含pool的创建, 任务的提交, 回调函数等
@Singleton
class BambooPool:
def __init__(self):
self._maxnum = 0
self.pool = None
self.tasks = queue.Queue()

@lazyproperty
def maxnum(self):
if self._maxnum > 0:
return self._maxnum
self._maxnum = current_app.config.get('BAMBOO_POOL_MAX_NUM', 16)
return self._maxnum

def create(self):
if self.pool:
try:
self.pool.shutdown()
except:
pass

try:
self.pool = ThreadPoolExecutor(max_workers=self.maxnum)
except Exception as msg:
current_app.logger.exception(f'bamboo pool创建发生异常:{msg}')
raise BambooStartNodeNotFound(f'发生未知异常:{msg}')
else:
current_app.logger.info(f'bamboo pool创建成功')

def submit_one_task(self, cb, *args, **kwargs):
if not self.pool:
self.create()
try:
app = current_app._get_current_object()
task = self.pool.submit(cb, app, *args, **kwargs)
except Exception as msg:
current_app.logger.exception(f'bamboo pool提交task异常:{msg}')
self.create()
task = self.pool.submit(cb, *args, **kwargs)
return task

def submit_and_save_task(self, cb, *args, **kwargs):
# 1. 创建task
task = self.submit_one_task(cb, *args, **kwargs)
# 2. 将task放入待观察队列中
self.tasks.put(task)

def async_wait_task(self, tasks=None, timeout=120):
""" 等待任务结果处理完成 """
start = int(time.time())
if not tasks:
tasks = self.tasks
num, sleep_interval = 0, 0.1

try:
while True:
time.sleep(sleep_interval)
num += 1
if num * sleep_interval > timeout:
current_app.logger.error(f'bamboo pool等待任务结果超时:{num}, 超时时间:{timeout}')
break
_task = tasks.get(False)
if not _task:
continue
if _task.done():
continue
tasks.put(_task)
except queue.Empty:
return True
except Exception as msg:
current_app.logger.exception(f'bamboo pool等待任务结果异常:{msg}')
return False
return False

def wait_special_tasks(self, tasks, timeout=120):
num, sleep_interval = 0, 0.5
if not tasks:
return False

try:
while True:
if not tasks:
current_app.logger.info(f'bamboo pool所有任务均处理结束, 轮询等待结束')
return True
time.sleep(sleep_interval)
num += 1
if num * sleep_interval > timeout:
current_app.logger.error(f'bamboo pool等待任务结果超时:{num}, 超时时间:{timeout}')
break
undo_tasks = []
for _task in tasks:
if not _task:
continue
if not _task.done():
undo_tasks.append(_task)
tasks = undo_tasks
except Exception as msg:
current_app.logger.exception(f'bamboo pool等待任务结果异常:{msg}')
return False
return False

# 2. 使用
async_tasks = []
drcc_pool = DrccPool()
task = drcc_pool.submit_one_task(flows, id, nids, False, skids)
async_tasks.append(task)
drcc_pool.wait_special_tasks(async_tasks)

这里基于 Python 的 ThreadPoolExecutor 实现了线程池, 各个函数说明如下:

  • submit: 非阻塞提交线程到线程池中并返回任务句柄
  • done: 判断某一个任务是否结束
  • result: 获取任务的返回值
  • wait: 等待任务结束, 不过这个不够灵活

协程在 python 中使用

关于协程的相关知识点介绍见同步编程到异步编程的技术演进第三章的介绍, 这里主要讲解下协程在 python 环境下的落地实现, 具体见python 协程.

参考

书籍参考:

  • <现代操作系统>
  • <python cookbook>
  • <python标准库>