Python Multithreading & Multiprocess
2022-11-15 14:16:00

1、并发编程理论

本章节中同步和异步阻塞和非阻塞较难理解, 只做了浅显的介绍, 如果需要深层次理解可以参考 怎样理解阻塞非阻塞与同步异步的区别? 问题下的回答

1-1 CPU 密集型与 I/O 密集型

CPU 密集型 (CPU-bound):

  • 在执行数据处理、科学计算等任务时, CPU 的资源被大量使用, 这个任务属于 CPU-bound

I/O 密集型 (I/O-bound):

  • 在进行交互操作、Web 应用等任务时, 大部分时间 CPU 都在等待 I/O (输入/输出) 任务的数据时, 这个任务属于 I/O-bound

1-2 并发与并行

并发 (Concurency): 代表计算机可以同时执行多个任务

  • 单核处理器: 计算机通过分配时间片的方式, 让多个任务交替运行, 这个过程被称为进程或是线程的上下文切换 (context switching)

并行 (Parallelism): 代表计算机同时处理多个任务

  • 多核处理器: 计算机调用多个处理器, 同时处理多个任务, 这种情况称为并行

1-3 进程与线程

进程 (Process):

  • 进程是操作系统中的一个基本概念, 它包含运行一个程序所需要的资源, 一个正在运行的应用程序在操作系统中被视为一个线程
  • 进程之间是相对的, 一个进程无法访问另一个进程的数据 (除非使用分布式计算方式), 一个进程的失败同样不会影响其他进程
  • 操作系统利进程将工作区划分为多个独立的区域, 进程作为程序的运行边界
  • 简单来说一个进程就相当于一个正在运行的应用程序, 是应用程序的一次动态执行过程

线程 (Thread):

  • 一个进程中可能存在一个或多个线程, 现成是操作系统分配处理器时间的基本单元, 可能有多个线程在同时执行代码
  • 在进程入口执行的一个线程被称为程序的主线程
  • 线程主要由 CPU 寄存器、调用栈和线程本低存储器 (Thread Local Storage, TLS) 组成
    • CPU 寄存器主要记录当前执行线程的状态
    • 调用栈主要用于维护线程所调用的内存与数据
    • TLS 主要用于存放线程的状态信息
  • 线程本身不是一个计算机硬件的功能, 而是操作系统提供的一种逻辑功能
  • 多线程本质上是进程中一段并发执行的代码, 需要操作系统提供 CPU 资源来运行和调度

进程和线程的区别

  • 进程拥有独立的地址空间, 进程崩溃后, 在保护模式下不会对其他进程产生影响
  • 线程是一个进程中的不同执行路径, 线程有自己的堆栈和局部变量, 却没有自己单独的地址空间, 一个线程崩溃会导致整个进程结束
  • 多进程的程序要比多线程的程序更安全可靠, 但在切换进程的时候, 资源耗费会比多线程更大, 效率相对较差
  • 多线程的程序有自己的局部变量, 对于需要共享局部变量的并发操作, 只能使用多线程实现

1-4 同步与异步

同步和异步关注的是消息通信机制 (synchronous communication / asynchronous communication)

同步 (Synchronous):

  • “调用者” 主动发送一个 “调用”, 在没有得到结果之前, 这个调用就不会返回, 但是一旦调用返回, 就能直接获得返回值
  • 同步相当于是由 “调用者” 主动等待这个 “调用” 的结果

异步 (Asynchronous):

  • 与同步相反, 在 “调用” 发出后, 这个调用直接返回了, 此时并没有返回值
  • 当一个异步过程的 “调用” 发出后, 调用者不会立即获得返回结果, 而是在 “调用” 发出后, “被调用者” 通过状态, 通知, 来告知 “调用者”, 或者通过 “回调函数” 处理这个调用

一个例子: 假设我需要知道我的卡里还剩下多少钱
同步通信机制下: 我 (“调用者”) 打电话给银行询问客服 (发送 “调用”), 客服告知正在查询, 并让我稍作等待, 于是我在通信的过程进行了等待, 稍后客服告知了我查询结果 (返回 “调用” 结果)
异步通信机制下: 我 (“调用者”) 打电话给银行询问客服 (发送 “调用”), 客服告知正在查询并挂断了电话 (不返回结果, 返回状态), 在等待结果期间, 我可以去做其他事情, 在查询完成后, 客服主动打电话告知我结果 (通知并”回调” 结果)

  • 同步和异步实际上代表的是两种不同的编程模型, 可以通过多种方式来实现

1-5 阻塞与非阻塞

阻塞和非阻塞描述的是程序在等待 “调用” 结果 (消息、返回值) 时的状态

阻塞调用是指在调用结果返回之前, 当前线程会被挂起, 调用线程只有在得到结果之后才会返回。

非阻塞调用指的是不能立刻得到结果之前, 该调用不会则当前线程

依然假设我需要知道我卡里还剩多少钱
阻塞调用: 在客服告诉我结果之前, 我会把自己 “挂起” 直到客服告诉我结果
非阻塞调用: 无论客服是否告诉我结果, 我都会继续做其他事情, 并隔一段时间检查一下客服是否有返回结果
(这里的阻塞与非阻塞和同步异步无关, 跟客服用何种方式告知你返回结果也无关)

补充说明
多线程 (Multi threading):

  • 多线程编程可以是实现异步的一种方式 (但并不是唯一), 创建多个线程一起运行
    • (Python 中就可以使用 asyncawait 函数实现异步)
  • 在多核的环境下, 每个线程都会被分配到独立的核心上运行从而实现 “并行”
  • 在单核的环境下, 操作系统会通过分配时间片的方式执行这些线程, 但也是 “并发” 地在执行
  • I/O 密集型任务更加适合使用异步编程方式, 不一定适合多线程
    • 线程本身会占用内存, 切换线程也会占用资源
  • 计算量密集的应用程序通常都适合使用多线程计算
  • Reference: https://www.youtube.com/watch?v=I3E4MHTpABA

2、多线程 threading 模块

2-1 创建和启动线程

threading.Thread 创建线程

  • 使用 import threading 导入多线程模块
  • 使用 thread_obj = threading.Thread(target=函数名称, args=(参数1, 参数2)) 来定义多线程对象
    • args 只有一个参数时, 需要在第一个参数后跟上逗号 ,
  • 使用 thread_obj.start() 启动一个线程
    • 可以同时启动多个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading

def func(n):
total = 0
for i in range(n+1):
total += i
print(total)


thread_obj1 = threading.Thread(target=func, args=(100,))
thread_obj2 = threading.Thread(target=func, args=(200,))

thread_obj1.start() # 5050
thread_obj2.start() # 20100

重写 Thread 父类 run 方法实现多线程

  • 重写的多线程需要继承父类的 __init__ 成员
  • 重写 run 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time


class MyThread(threading.Thread):
def __init__(self, name):
super(MyThread, self).__init__()
self.name = name

def run(self):
print(self.name)


thread_obj1 = MyThread("Thread-1")
thread_obj2 = MyThread("Thread-2")

thread_obj1.start() # Thread-1
thread_obj2.start() # Thread-2

计算子线程执行时间

  • 在执行 sleep 函数时将不会占用 CPU 资源, 程序将会被挂起
  • 每个线程都可以添加一个 join() 方法等待线程结束, 当所有 join() 等待的线程都结束后, 主线程才会继续运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

start_time = time.time()
thread_list = []

print("程序主线程正在运行: ", threading.current_thread())

def func(name):
# 输出当前线程
print(f"子线程正在运行: {name}, {threading.current_thread()}")
time.sleep(3)

for i in range(3):
t = threading.Thread(target=func, args=(f"Thread-{i}",))
t.start()

for i in range(3):
t.join()

print(f"所有线程已结束, 花费时间: {time.time() - start_time}")
print("程序主线程运行结束: ", threading.current_thread())
1
2
3
4
5
6
7
# Output
程序主线程正在运行: <_MainThread(MainThread, started 8779113728)>
子线程正在运行: Thread-0, <Thread(Thread-1 (func), started 6110244864)>
子线程正在运行: Thread-1, <Thread(Thread-2 (func), started 6127071232)>
子线程正在运行: Thread-2, <Thread(Thread-3 (func), started 6143897600)>
所有线程已结束, 花费时间: 3.013370990753174
程序主线程运行结束: <_MainThread(MainThread, started 8779113728)>

统计当前活跃的线程数

主线程比子线程执行要快, 当主线程执行 active_count() (返回当前活动的线程总数) 时, 其他子线程都没执行完毕, 可以在主线程使用 threading.active_count() 统计所有活跃线程的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time

def func(name):
# 输出当前线程
print(f"子线程正在运行: {name}, {threading.current_thread()}")
time.sleep(1)

# 启动三个子线程
for i in range(3):
t = threading.Thread(target=func, args=(f"Thread-{i}",))
t.start()

print("当前活跃线程数量为: ", threading.active_count())
1
2
3
4
5
# Output
子线程正在运行: Thread-0, <Thread(Thread-1 (func), started 6144815104)>
子线程正在运行: Thread-1, <Thread(Thread-2 (func), started 6161641472)>
子线程正在运行: Thread-2, <Thread(Thread-3 (func), started 6178467840)>
当前活跃线程数量为: 4

守护进程

  • 实例化时候指定 daemon=True 可以将线程设置为守护进程, 当主线程结束时, 守护进程也会结束
  • 守护线程必须在线程 start() 方法之前设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time

start_time = time.time()

def func(name):
print('子线程正在运行')
time.sleep(3)

for i in range(3):
t = threading.Thread(target=func, args=(i,), daemon=True)
t.start()

print('主线程结束, 共花费时间 %s' % (time.time() - start_time))
1
2
3
4
5
# Output
子线程正在运行
子线程正在运行
子线程正在运行
主线程结束, 共花费时间 0.00023508071899414062

2-2 线程安全

GIL 全局解释器锁

GIL 锁的定义

全局解释器锁(英语:Global Interpreter Lock,缩写GIL)

是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。

即便在多核心处理器上,使用GL的解释器也只允许同一时间执行一个线程。

在 Python 中即使多 CPU ,每个 CPU 的进程中的线程,同一时刻只能执行一个线程
而在其他大部分编程语言中是任意一个 “时刻” 一个cpu执行一个线程,多个cpu可以同时刻执行多个线程

为何会有 GIL ?

Python 设计初期, 为了规避多线程之间数据的完整性和状态同步问题, 引入了 GIL 这个概念, 发展至今大部分第三方库都依赖 GIL, 导致无法去除

如何规避 GIL 带来的问题

多线程 threading 机制依然是有用的,用于IO密集型计算, 因为在 I/O (read, write, send, recv, etc) 期间,线程会释放GIL, 实现CPU和IO的并行, 因此多线程用于IO密集型计算依然可以大幅提升速度, 但是多线程用于CPU密集型计算时,只会更加拖慢速度

使用 multiprocessing 的多进程机制实现并行计算、利用多核CPU优势, 为了应对GIL的问题,Python提供了multiprocessing

简单来说遇到 CPU 密集型计算使用 multiprocessing, 相当于同时开启两个程序一起运行; 遇到 I/O 密集型计算还是可以使用 threading

线程锁

代码在 https://c.runoob.com/compile/9/https://www.bejson.com/runcode/python3/ 进行测试

  • 由于在多线程中, 所有线程共享一个变量, 当程序在运行一半时释放 GIl 锁, 可能导致出现数据混乱问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading

total = 0

def func():
global total
for i in range(500000):
total += 1
total -= 1

for i in range(2):
t = threading.Thread(target=func, args=())
t.start()

print(total) # 程序运行多次结果都有概率不同
  • 使用互斥锁 (Lock)、递归锁 (RLock) 都可以避免这种情况发生

互斥锁 Lock

  • 使用 lock_obj = threading.Lock() 实例化一个互斥锁 (Mutual exclusion, Mutex) 对象
  • 使用 lock_obj.acquire() 可以锁定当前线程, 锁定线程若不解锁, 则不会进入下一个线程
  • 使用 lock_obj.release() 释放线程锁, 释放线程锁后可以执行下一个线程
  • 在操作数据前锁定线程, 在操作数据完后释放线程锁, 可以有效避免数据混乱的问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

total = 0
mutex = threading.Lock()

def func():
global total
for i in range(500000):
mutex.acquire()
total += 1
total -= 1
mutex.release()

for i in range(2):
t = threading.Thread(target=func, args=())
t.start()

print(total) # 程序多次运行结果都为 0
  • Lock 的互斥锁不支持嵌套结构, 假如在一个互斥锁锁内部再嵌套一个互斥锁, 可能造成死锁程序无法继续运行

递归锁 RLock

  • RLock 类的用法和 Lock 一样, 但是 RLock 支持嵌套结构, 可以在一个线程中上多个 RLock
  • 只有当一个线程中所有的 RLock 都被 release, 后续的线程才能继续执行

信号量 Semaphore

  • 互斥锁和递归锁只允许一个线程修改数据, 而 Semaphore 最多可以允许 5 个线程同时修改数据
  • 使用 sem_obj = threading.BoundedSemaphore(信号量数量) 实例化一个信号量对象
  • 在线程中使用 sem_obj.acquire() 锁定当前线程
  • 在线程中使用 sem_obj.release() 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time


def func(name):
sem_obj.acquire()
print(f'Thread {name} is running')
time.sleep(3)
sem_obj.release()


sem_obj = threading.BoundedSemaphore(5)

for i in range(1, 21):
t = threading.Thread(target=func, args=(i,))
t.start()

# 程序每执行 5 个线程后将会等待 3 秒

2-3 事件 Event

  • 事件主要用于控制其他线程的执行, 事件是一个简单的线程同步对象
  • 使用 event_obj = threading.Event() 定义一个事件对象
  • 使用 event_obj.wait() 监听 Flag 状态
  • 全局共享一个 Flag, 当 Flag 值为 Flaseevent.wait() 就会阻塞, 反之则不阻塞
  • 使用 evene_obj.set() 设置 Flag 为 True
  • 使用 event_obj.clear() 设置 Flag 为 False
  • 使用 event_obj.is_set() 判断 Flag 状态为 True 或 False
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import threading
import time

event = threading.Event()


def lights():
count = 0
event.set() # 设置 Flag 为 True
while True:
if 5 < count <= 10:
event.clear() # count = 5 ~ 10 设置 Flag 为 False
print(f'\033[0;31m red light is on... {count}\033[0m {event.is_set()}')
elif count > 10:
event.set() # count > 10 设置 Flag 为 True
count = 0 # 将 count 清零重新计算
else:
print(f'\033[0;32m green light is on... {count}\033[0m {event.is_set}')

time.sleep(1)
count += 1


def car(name):
while True:
if event.is_set(): # 检测 Flag 标志位是否为 False
print(f'{name} is running...')
time.sleep(1)
else:
print(f'{name} Waiting for lights')
event.wait()
print(f'{name} OK Let\'s go')


lights_thread = threading.Thread(target=lights, args=())
car_thread = threading.Thread(target=car, args=("Taxi",))

lights_thread.start()
car_thread.start()

"""
1. lights_thread 进程 start 时设置 Flag 为 True
1. count 计数 5 秒绿灯
2. 大于 5 时切换 Flag 为 Flase
2. car_thread start 判断 Flag, 如果为 True, 则执行线程对象
1. count 大于 10 时重新设置 Flag 为 True
2. 以此反复
3. 当 lights_thread 设置 Flag 为 Flase 时, car_thread 进入等待
"""

2-4 条件 Condition

  • Condition被称为条件变量,除了提供与 Lock 类似的 acquirerelease 方法外,还提供了 waitnotify 方法

  • 使用 cond_obj = threading.Conditon() 实例化一个条件对象

  • 首先由一个线程使用 cond_obj.acquire() 并且判断一个条件变量

    • 如果条件不满足则 cond_obj.wait() 进入状态
    • 如果条件满足, 使用 cond_obj,notify() 通知其他方法, 其他处于 wait 状态的线程收到通知后会重新判断条件
  • 使用 while True 条件循环语句不断循环这一步骤, 实现不同线程通信

  • 当调用 wait 方法时, 线程会释放 Condition 内部的锁使线程进入堵塞 (blocked) 状态, 同时在 waiting 池中记录这个线程, 当调用 notify 方法时, Condition 对象会从 waiting 池中挑选一个线程, 通知其调用 acquire 方法尝试获取到锁

  • Condition 对象的构造函数可以接受一个 Lock/RLock 对象作为参数, 如果没有指定, 则 Conditon 对象会在内部创建一个 RLock

  • Conditon 对象还提供了 notifyAll 方法, 可以通知 waiting 池中所有的线程尝试 acquire 内部锁

  • 处于 waiting 状态的线程只能通过 notify 方法唤醒, 可能导致某些线程永远处于等待状态, 使用 notifyAll 可以避免这种情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time

count = 0
condition = threading.Condition()


def producer():
while True:
if condition.acquire():
global count
if count <= 3:
count += 10
print(f'生产者 +10 count={count}')
condition.notify()
else:
condition.wait()
condition.release()


def consumer():
while True:
if condition.acquire():
global count
if count > 3:
count -= 1
print(f'消费者 -1: count={count}')
time.sleep(0.5)
condition.notify()
else:
condition.wait()
condition.release()


p = threading.Thread(target=producer, args=())
c = threading.Thread(target=consumer, args=())
p.start()
c.start()

Reference:

2-5 定时器 Timer

  • 定时器 Timer 指定 n 秒后再执行操作
  • 定义线程对象时直接使用 t_obj = threading.Timer(时间/秒, 函数名称) 来定义定时器线程对象
1
2
3
4
5
6
7
import threading

def func():
print('Hello')

t = threading.Timer(1, func)
t.start()

3、多进程 multiprocessing 模块

3-1 创建子进程

  • 使用 import multiprocessing 导入多进程模块
  • 使用 process_obj = multiprocessing.Process(target=函数名称, args=(参数1, 参数2)) 实例化一个多进程对象
  • 使用 process_obj.start() 启动子进程
  • 使用 process_obj.join() 等待进程结束
  • 由于启动多线程时是启动了一个新的 Python 进程, 需要将 process_obj.start() 放入 if __name__ == "__main__" 条件判断语句内, 避免无限循环
1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing
import os

def func():
print(f'Subprocess is running ... [{os.getpid()}]') # Subprocess is running ... [58620]
print(f' - Parent process pid is [{os.getppid()}]') # Parent process pid is [58618]

if __name__ == '__main__':
print(f'Parent process is running [{os.getpid()}]') # Parent process is running [58618]
p = multiprocessing.Process(target=func, args=())
p.start()
p.join()

3-2 进程间通信

Queue()

  • 使用 from multiprocessing import Queue 导入 Queue 模块
  • 使用 queue_obj = Queue() 实例化一个 Queue 对象
  • 启动子进程时, 需要将 queue_obj 作为参数传入
  • 在进程中使用 queue_obj.put('数据') 存入数据
  • 在任何位置使用 queue_obj.get() 获取 Queue 对象数据
  • 更多 queue 对象方法参考 Queue对象 Python 3.10 文档
1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Queue, Process

def func(queue_obj):
print('Subprocess is running...')
queue_obj.put(['Hello World', 'Hello Python'])

if __name__ == '__main__':
queue_obj = Queue()
process_obj = Process(target=func, args=(queue_obj,))
process_obj.start()
print(queue_obj.get()) # ['Hello World', 'Hello Python']

Piep()

  • Pipe 类似于 socket 模块, 主要用于数据传递而不是数据共享, pipe() 返回两个连个链接对象分别为管道的两端, 每个连接对象都有 send() 发送、recv() 接受方法
  • 使用 from multiprocess import Pipe 导入 Pipe 模块
  • 使用 parent_conn, child_conn = Pipe() 实例化子进程和父进程两端的通讯对象
  • 使用 conn.send(obj) 发送一个对象到连接的另一端
  • 接收端使用 conn.recv() 接受另一方发送的数据, 该方法会阻塞直到收到对象
1
2
3
4
5
6
7
8
9
10
from multiprocessing import Process, Pipe

def func(child_conn):
child_conn.send('Hello World')

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
process_obj = Process(target=func, args=(child_conn,))
process_obj.start()
print(parent_conn.recv()) # Hello World

3-3 Manager 管理线程

  • 通过 Manager 可实现进程数据共享, Manager() 返回一个 manager 对象通过一个服务进程, 来使其他通过进程通过代理方式操作对象
  • manager 支持 listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray
  • 使用 from multiprocessing import Manager 导入 Manager 模块
  • 使用 with Manager() as manager 实例化一个 manager server 对象
  • 使用 manager_obj = manager.list() 创建一个 manager 列表对象
  • manager_obj 需要作为参数传入子进程
  • 子进程可以直接操作 manager_obj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process, Manager


def func(l):
l.append(1)


if __name__ == '__main__':
with Manager() as manager:
manager_obj = manager.list()

p_list = []
for i in range(10):
process_obj = Process(target=func, args=(manager_obj,))
process_obj.start()
p_list.append(process_obj)
for res in p_list:
res.join()

print(manager_obj) # [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

3-4 进程锁 (进程同步)

  • 类似线程锁, 锁定当前进程后其他进程无法继续运行, 主要用于保证输出在同一屏幕而会两行合并为一行
  • 使用 lock = multiprocessing.Lock() 实例化一个线程锁对象
  • 使用 lock.acquire() 锁定一个线程
  • 使用 lock.release() 释放一个线程

3-5 进程池

  • 由于进程间切换可能导致占用大量内存空间, 为防止这种情况可以使用进程池
  • 使用 from multiprocessing import Pool 导入进程池模块
  • 使用 pool = Pool(数量) 实例化 n 个进程池对象 (允许同时放入 5 个进程)
  • 使用 pool.apply(func=函数名称, args=(参数1, )) 同步 (串行) 执行 n 个线程
  • 使用 pool.apply_async(func=函数名称, args=(参数1, )) 异步 (并行) 执行 n 个线程
  • 使用 pool.close() 等待所有进程结束后再关闭进程池
  • 使用 pool.join() 等待所有子进程执行完毕后再结束主进程 (需要放在 pool.close() 之后)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process, Pool
import time

def func(name):
print(f'[{name}] Hello')
time.sleep(5)

if __name__ == '__main__':
pool = Pool(5)
for i in range(1, 11):
pool.apply_async(func=func, args=(i,))
pool.close()
pool.join()
print('Pool End')

4、协程

  • 线程和进程是由程序处罚系统接口, 最后的执行者是系统, 它本质上操作系统提供的功能
  • 协程的操作是程序员指定的, 在 Python 中拖过 yield, 人为的实现并发处理
  • 协程的意义:
    • 对于多线程应用, CPU 通过切片方式来切换线程间的执行, 现成切换时, 需要消耗时间
    • 协程只使用一个线程, 分解一个线程为多个 “微县城” 在一个线程中规定某个代码块的执行顺序
  • 协程应用场景:
    • I/O 密集型任务时 (大量操作不需要使用到 CPU)
  • 实现协程常用第三方模块 gevent 和 greenlet (gevent 是对 greenlet 的高级封装, 一般使用 gevent 即可)

4-1 greenlet

  • greenlet 通过 switch 方法在不同任务间切换
  • 使用 from greenlet import greenlet 导入模块
  • 使用 gr_obj = greenlet(函数名称) 实例化 greenlet 对象
  • 使用 gr_obj.switch() 手动切换下一个对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from greenlet import greenlet
import time

def func1():
print("[1] Hello func")
time.sleep(1)
print("[1] Conversion func2")
time.sleep(1)
gr_obj2.switch()
print('End of')


def func2():
print("[2] Hello")
time.sleep(1)
print("[2] Conversion func1")
time.sleep(1)
gr_obj1.switch()

gr_obj1 = greenlet(func1)
gr_obj2 = greenlet(func2)
gr_obj1.switch()
1
2
3
4
5
6
# Output
[1] Hello func
[1] Conversion func2
[2] Hello
[2] Conversion func1
End of

4-2 gevent

  • gevent 的代码封装度很高, 使用只需要了解基本方法
  • 使用 import gevent 导入模块
  • 使用 gevent.joinall([gevent.spawn(函数, "参数")]) 通过 jioinall 将任务函数和参数进行统一调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
import requests


def f(url):
print(f'GET: {url}')
resp = requests.get(url)
data = resp.text
print(f"{len(data)} bytes received from {url}")


gevent.joinall([
gevent.spawn(f, "https://www.python.org/"),
gevent.spawn(f, "https://www.bilibili.com/")
]
)
1
2
3
4
5
# Output
GET: https://www.python.org/
50985 bytes received from https://www.python.org/
GET: https://www.bilibili.com/
4244 bytes received from https://www.bilibili.com/

补充说明:


Reference:

上一页
2022-11-15 14:16:00
下一页