Python 并发编程实战,用多线程、多进程和多协程加速程序运行
2022-12-01 19:39:00

0x00 引言

Python 并发编程简介

  1. 为什么要引入并发编程
    • 场景1: 普通爬虫需要花费长时间的任务, 通过并发编程可以减少花费时间
    • 场景2: APP 应用, 后台有大部分请求资源, 使用异步并发可以降低延迟
    • 引入并发, 提升程序运行速度
    • 学习掌握并发编程技术, 是程序员不可缺少的技能
  2. 有哪些程序提速的方法
    • 可以通过四种方式提升程序运行速度: 单线程串行(不加改造的程序)、多线程并发(threading)、多CPU并行(multiprocessing)、多机器并行 (hadoop/hive/spark)
    • 本节课程不会介绍多机器并行方式, 主要使用 threading、multiprocessing
  3. Python对并发编程的支持
    • 多线程: threading, 利用 CPU 和 IO 可以同时执行的原理, 让 CPU 不会干巴巴等待 IO 完成
    • 多进程: multiprocessing, 利用多核心 CPu 的能力, 真正的并行执行任务
    • 异步 IO: asyncio, 在单线程利用 CPU 和 IO 同时执行的原理, 实现函数异步执行
    • 使用 Lock 对资源枷锁, 防止冲突访问
    • 使用 Queue 实现不同进程/现成间的数据通信, 实现生产者-消费者模式
    • 使用线程池 Pool/进程池 Pool, 简化线程/进程的任务提交、等待结束、获取结果
    • 使用 subprocess 启动外部程序的进程, 并进行输入输出交互

怎样选择多线程、多进程和多协程

  1. 什么是 CPU 密集型计算、IO 密集型计算?
    • CPU-bound (CPU 密集型)
      • 程序运行的速度受到 CPU 的限制
      • CPU 密集型也叫计算密集型, 是指 I/O 在很的短时间内就可以完成, CPU 需要大量的计算和处理, 特点是 CPU 占用率相当高
      • 例如: 压缩解压缩、加密解密、正则表达式搜索
    • I/O-bound (IO 密集型)
      • 程序运行的速度受到 I/O 的限制
      • IO 密集型是指系统运作大部分的情况下是 CPU 在等待 I/O (硬盘/内存) 的读/写操作, CPU 实际占用率较低
      • 例如: 文件处理程序、网络爬虫程序、读写数据库程序
  2. 多线程、多进程、多协程的对比
    • 多进程 Process (multiprocessing)、多线程 Thread (threading)、多协程 Coroutine (asyncio)
    • 一个进程中, 可以启动 N 个线程、一个线程中, 可以启动 N 个协程
    • 多线程 Thread
      • 相比进程, 更轻量级, 占用资源少, 但在 Python 中多线程只能并发执行, 不能利用多 CPU (GIL 锁限制)
      • 相比协程, 启动数目有限制, 占用内存资源, 有线程切换开销
      • 主要适用于 I/O 密集型计算、同时运行的任务数目要求不多
    • 多进程 Process
      • 可以利用多核 CPU 并行运算, 但占用资源最多, 可启动数目最少
      • 主要适用于 CPU 密集型计算
    • 多协程 Coroutine
      • 内存开销最少, 启动协程数量最多, 缺点在于支持的库有限制, 如 requests 不支持协程, 需要使用 aiohttp 代替, 实现代码相对复杂
      • 适用于 I/O 密集型计算、需要超多任务运行、拥有现成库支持的场景

GIL 全局解释器锁

相比 C/C++/Java, Python 确实慢, 在一些特殊场景下, Python 比 C++ 慢 100~200 倍

由于速度慢的原因, 很多公司的基础架构代码仍然在使用 C/C++ 开发, 比如各大公司阿里、腾讯、快手的推荐引擎、搜索引擎、存储引擎等对性能要求高的模块

  1. Python 速度慢的原因:

    • Python 是动态类型语言, 不像 C/Java 需要编译后再执行, 而 Python 需要边解释边执行, Python 中的变量可以是数字也可以是字符, 同样在程序运行时, 需要不断检查变量的类型
    • Python 存在 GIL 锁, 无法利用多核 CPU 并发执行
  2. 全局解释器锁 (Global Interpreter Lock, GIL)

    • GIL 是计算机程序设计语言解释器用于同步的一种机制, 它使得任何时刻仅有一个线程在执行。即使在多核心处理器上, 使用 GIL 的解释器也只允许统一时间执行一个进程
    • With the GIL, you get cooperative multitasking
    • When a thread is running, it holds the GIL
    • GIL released on I/O (read,write,send,recv,etc.)
  3. 为何会有 GIL 解释器锁

    • Python 设计的初期, 为了规避并发问题引入了 GIL, 而到现在因为大部分都库都依赖于 GIL 而运行, 现在想去除而又去不掉了
    • GIL 主要存在的目的是为解决线程之间数据完整性和状态同步的问题, Python 中的对象管理, 是使用引用计数器进行的, 引用数为 0 则释放对象
  4. 怎样规避 GIL 带来的限制

多线程 threading 机制仍然是有用的, 用于 IO 密集型计算

  • 在 I/O (read, write, send, recv, etc.) 期间, 线程会释放 GIL, 实现 CPU 和 IO 的并行, 因此在多线程用于 IO 密集型计算仍然可以大幅提升速度
  • 但是多线程用于 CPU 密集型计算时, 只会更加拖慢速度

使用 multiprocessing 的多进程机制实现并行计算、利用多核 CPU 优势, 为了应对 GIL 的问题, Python 提供了 multiprocessing

0x01 利用多线程加速爬虫程序

Python 创建多线程的方式

  • 首先准备任意一个函数
  • 使用 import threading 导入多线程模块
  • 使用 t = threading.Thread(target=函数名称, args=(参数,)) 实例化一个线程对象
  • 使用 t.strat() 启动这个线程, 使用 t.join() 等待这个线程结束
1
2
3
4
5
6
7
8
import threading

def func(name):
print(name)

t = threading.Thread(target=func, args=("Python",))
t.start()
t.join()

单线程爬虫 VS 多线程爬虫速度对比

  • 首先创建一个 craw() 函数, 调用 requests 模块, 向页面发送请求并返回 url 和页面长度
  • 使用 for 循环结构依次调用 craw() 函数, 实现单线程调用, 并计算运行时间
  • 使用 threads 变量存储线程对象, 并依此添加所有 url, 通过 for 循环所有线程对象启动和终止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests
import threading
import time

def craw(url):
r = requests.get(url)
print(url, len(r.text))

if __name__ == '__main__':
urls = [
f"https://www.cnblogs.com/sitehome/p/{page}"
for page in range(1, 51)
]
# single thread running
print('single thread running')
start_time = time.time()
for url in urls:
craw(url)
print(f'singe thread cost {time.time() - start_time} seconds.')

# multi thread running
print('multi thread running')
start_time = time.time()
threads = []
for url in urls:
threads.append(threading.Thread(target=craw, args=(url,)))

for thread in threads:
thread.start()

for thread in threads:
thread.join()

print(f'multi thread cost {time.time() - start_time} seconds.')
1
2
3
4
5
6
7
8
9
10
11
# 程序运行结果
single thread running
https://www.cnblogs.com/sitehome/p/1 68683
...
https://www.cnblogs.com/sitehome/p/50 69779
singe thread cost 3.5241050720214844 seconds.
multi thread running
https://www.cnblogs.com/sitehome/p/5 69428
...
https://www.cnblogs.com/sitehome/p/29 69997
multi thread cost 0.5849659442901611 seconds.

0x02 生产者消费者模式多线程爬虫

Pipeline 技术架构和生产者消费者模型

  1. 多组件的 Pipeline 技术架构
    • 复杂的事情不会一下子做完, 而是分很多中间步骤一步步完成
    • 比如说一个程序从输入数据到输出数据, 中间会经过多个处理器 (Processor), 每个处理器是一个处理模块, 这种将数据通过多个模块来处理的技术架构叫做 Pipeline 架构
    • 生产者消费者模式就是典型的 Pipeline 架构, 生产者生产数据后, 会传递给消费者来消费数据
    • 生产者传入的数据作为生产原料, 消费者输出的内容作为输出数据
  2. 生产者消费者爬虫的架构
    • 生产者消费者架构相当于内部有两个 Processor
    • 第一个 Processor 获取待爬取的 URL 进行网页的下载, 将下载好的内容放进一个 “网页队列”
    • 第二个 Processor 消费 “网页队列” 中的数据进行解析, 并且把解析的结果进行存储
  3. Python 创建线程队列的方式
    • 使用 queue.Queue 可以用于多线程之间的、线程安全的数据通信
    • 使用 import queue 导入队列类库
    • 使用 q = queueQueue() 创建队列对象
    • 使用 q.put(item) 添加元素
      • 当队列元素已满时会发生阻塞
    • 使用 q.get() 获取元素
      • 当队列元素为空时会发生阻塞
    • 使用 q.qsize() 查看元素大小
    • 使用 q.empty() 判断队列是否为空
    • 使用 q.full() 判断队列是否已满

代码编写实现生产者消费者爬虫

  • 先创建两个函数, craw() 用于获取页面 html 结果, parse() 用于提取页面元素
  • 使用 url_queue = queue.Queue() 创建一个 url 队列的对象
  • 使用 html_queue = queue.Queue() 创建一个 html 队列的对象
  • 再创建两个队列函数, 无限循环并一直等待 queue 队列 getput 传入或输出数据
    • do_craw() 线程通过 url_queue.get() 获取 url, 并将 url 传入 craw() 函数获取 html 内容, 再通过 html_queue.put(html) 将获取到的 html 内容传入 html_queue 队列
    • do_parse() 线程通过 html_queue.get() 获取 do_craw() 传入的 html 内容, 调用 parse() 函数解析, 并将解析的结果直接输出
  • 通过 for 循环结构将 url 全部 put 进入 url_queue 队列
  • 再通过 for 循环结构创建并启动指向 do_craw()do_parse() 函数的多个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import threading
import requests
import queue
from bs4 import BeautifulSoup

def craw(url):
r = requests.get(url)
return r.text

def parse(html):
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]

def do_craw(url_queue, html_queue):
while True:
url = url_queue.get()
html = craw(url)
html_queue.put(html)

def do_parse(html_queue):
while True:
html = html_queue.get()
results = parse(html)
for result in results:
print(result)

if __name__ == '__main__':
urls = [
f"https://www.cnblogs.com/sitehome/p/{page}"
for page in range(1, 51)
]

url_queue = queue.Queue()
html_queue = queue.Queue()

for url in urls:
url_queue.put(url)

for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
t.start()

for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue,), name=f"parse{idx}")
t.start()

0x03 线程安全问题以及 Lock 解决方案

线程安全概念介绍

  • 线程安全指某个函数、函数库在多线程环境中被调用时, 能够正确地处理多个线程之间的共享变量, 使程序功能正确完成。
  • 由于线程的执行随时会切换, 就造成了不可预料的结果, 出现线程不安全

Lock 用于解决线程安全问题

  • 使用 lock = threading.Lock() 实例化一个线程锁对象
  • 使用 lock.acquire() 锁定线程, 再使用 lock.release() 释放线程锁, 或是在执行代码前使用上下文管理 with lock: 后续执行的代码都将被设置线程锁, with 代码块内的代码运行结束后, 线程锁自动释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading

lock = threading.Lock()

class Account:
def __init__(self, balance):
self.balance = balance

def draw(account, amount):
with lock:
if account.balance >= amount:
print(threading.current_thread().name, "取钱成功")
account.balance -= amount
print(threading.current_thread().name, "余额", account.balance)
else:
print(threading.current_thread().name, "取钱, 余额不足")

if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(target=draw, args=(account, 800))
tb = threading.Thread(target=draw, args=(account, 800))

ta.start()
tb.start()

0x04 好用的线程池 ThreadPoolExecutor

线程池的生命周期和原理

  1. 线程的生命周期状态包括新建、就绪、运行、终止和阻塞
    • 新建的线程通过 start 进入就绪状态, 此时线程并没有真正开始运行, 线程的运行需要系统进行调度
    • 通过系统调度线程真正进入运行状态
      • 运行中的线程可能会因为失去 CPU 资源, 重新进入就绪状态
      • 也有可能会因为遇到 sleep 或 io 进入阻塞状态, 等到 sleep 或 io 完成后重新进入就绪状态
    • 当线程运行完毕后线程进入终止状态
  2. 线程池的原理
    • 新建线程系统时需要分配资源, 终止线程系统需要回收资源, 当系统中存在大量线程需要使用时, 会频繁的新建和终止线程, 会产生大量的时间开销和线程开销
    • 如果可以重用线程, 则可以减去新建/终止的开销
    • 使用线程池时, 当有新的任务创建后, 新的任务会被放入任务队列中, 线程池会依次调用线程队列中的任务, 放入线程池中执行, 完成后会取下一个任务继续执行, 当所有线程都执行完成后, 线程池并不会销毁, 而是等待下一个任务的到来
  3. 线程池的好处
    • 提升性能: 因为减去了大量新建、终止线程的开销, 重用了线程资源
    • 适用场景: 适合处理突发性大量请求或是需要大量线程完成任务、但实际任务处理时间较短
    • 防御功能: 能有效避免系统因为创建线程过多, 而导致系统负荷过大响应变慢等问题
    • 代码优势: 使用线程池的语法比自己新建线程执行线程更加简洁

ThreadPoolExecutor 的使用语法

  • 使用 from concurrent.futures import ThreadPoolExecutor, as_completed 导入模块
  • 使用 with ThreadPoolExecutor() as pool: 实例化一个线程池对象
    • 在 with 语句内使用 results = pool.map(函数名称, 参数列表) 将每一个参数作用于线程池对象并获得返回结果
    • 也可以使用列表推导式 futures = [pool.submit(函数名称, 参数) for 参数 in 参数列表] 的方式通过 pool.submit 给线程池传入对象
    • futures 返回对象的结果存储在 futures.results() 内, 两种遍历方式如下 (使用 as_completed 时返回的顺序是不一定的, 而普通的遍历是依次返回)
1
2
3
4
for feture in futures:
print(future.result())
for future in as_completed(futures):
print(future.result())

使用线程池改造爬虫程序

  • craw 线程池
    • 使用 with concurrent.futures.ThreadPoolExecutor() as pool: 创建线程池对象
    • 使用 pool.map(craw, urls) 将 urls 列表中每一个元素都传入 craw 函数, 作用与线程池
    • 使用 zip(urls, htmls) 将 urls 的元素依次与 htmls 的元素对应生成一个 zip 对象 (url 对应 htmls)
    • 通过 for 循环遍历输出 url 对应的 html 页面长度信息 (在程序内已经存在 htmls 列表)
  • parse 线程池
    • 使用 with concurrent.futures.ThreadPoolExecutor() as pool: 创建线程池对象
    • 使用 for 循环依次遍历 url 和 html 内容
    • 使用 pool.submit(parse, html) 将 html 内容传入 parse 函数进行解析
    • 创建一个 futures 字典用来存储 url 以及对应提取后的内容
    • 使用 futures[future] = url 为字典填入 url 和提取后的内容
    • 最后通过 for 循环遍历 futures.items() 字典的 url 和提取后的内容
    • (使用 as_completed 遍历元素时, 不可以同时使用 items() 函数, url 需要在循环结构内再次进行赋值)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import requests
import concurrent.futures
from bs4 import BeautifulSoup

def craw(url):
r = requests.get(url)
return r.text

def parse(html):
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]

if __name__ == '__main__':
urls = [
f"https://www.cnblogs.com/sitehome/p/{page}"
for page in range(1, 51)
]

# craw
with concurrent.futures.ThreadPoolExecutor() as pool:
htmls = pool.map(craw, urls)
htmls = list(zip(urls, htmls))
for url, html in htmls:
print(url, len(html))
print('craw over.')

# parse - 1
# with concurrent.futures.ThreadPoolExecutor() as pool:
# futures = {}
# for url, html in htmls:
# future = pool.submit(parse, html)
# futures[future] = url

# for future, url in futures.items():
# print(url, future.result())
# print('parse over.')

# parse - 2
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
future = pool.submit(parse, html)
futures[future] = url

for future in concurrent.futures.as_completed(futures):
url = futures[future]
print(url, future.result())

0x05 在 Web 服务中使用线程池加速

#Skip-Temporarily

  1. Web 服务的架构以及特点
  2. 使用线程池 ThreadPoolExecutor 加速
  3. 代码用 Flask 实现 Web 服务并实现加速

0x06 使用多进程 multiprocessing 加速程序运行

为何有了多线程还需要多进程

  • 如果遇到了 CPU 密集型计算, 多线程反而会降低执行速度
  • 虽然有全局解释器锁 GIL, 但因为有 IO 的存在, 多线程依然可以加速运行
  • CPU 密集型计算, 线程的自动切换反而变成了负担, 多线程甚至会减慢程序运行速度
  • multiprocessing 模块就是 Python 为了解决 GIL 缺陷引入的一个模块, 原理是用多进程在多 CPU 上并行执行
  • 多进程和多线程的语法几乎完全相同, 如下图所示

单线程、多线程、多进程对比 CPU 密集计算速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import math
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
PRIMES = [112272535095293] * 100


def is_prime(number):
if number < 2:
return False
if number == 2:
return True
if number % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(number)))
for i in range(3, sqrt_n + 1, 2):
if number % i == 0:
return False
return True


def single_thread():
for number in PRIMES:
is_prime(number)


def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, PRIMES)


def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, PRIMES)


if __name__ == '__main__':
start_time = time.time()
single_thread()
print(f"single thread cost {time.time() - start_time} seconds")
start_time = time.time()
multi_thread()
print(f"multi thread cost {time.time() - start_time} seconds")
start_time = time.time()
multi_process()
print(f"multi process cost {time.time() - start_time} seconds")

1
2
3
4
# 程序运行结果
single thread cost 21.787539958953857 seconds
multi thread cost 21.68295693397522 seconds
multi process cost 2.8622336387634277 seconds

0x07 在 Flask 服务中使用进程池加速

#Skip-Temporarily

0x08 Python 异步 IO 实现并发爬虫

协程的基本概念

  • 协程: 在单线程内实现并发
  • 核心原理在于用一个超级循环 (其实就是 while true) 循环, 配合 IO 多路复用 (IO 时 CPU 可以做其他事)
  • 在一个线程进行中的步骤为 CPU、等待 IO、CPU, 使用协程可以在中途等待 IO 完成后, 直接进入下一个任务的第一个 CPU 计算状态, 而不必等待结果返回后再进入下一个任务

Python 异步 IO 库 asyncio 使用

  • 使用 import asyncio 导入异步 IO 库
  • 使用 async def 函数名称(参数) 定义协程函数
  • 携程函数内使用 await 属性名称 定义代码块, 不会等待, 而是直接切换到下一个任务
  • 使用 loop = asyncio.get_event_loop() 获取事件循环 (超级循环)
  • 使用 tasks = [loop.create_task(函数名称(参数)) for url in urls] 创建 task 列表
  • 使用 loop.run_until_complete(asyncio.wait(task)) 执行 task 列表
  • 使用协程需要对应的库支持异步 IO 特性, 其中 requests 不支持异步, 而 aiohttp 支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiohttp
import time


urls = [
f"https://www.cnblogs.com/sitehome/p/{page}"
for page in range(1, 51)
]


async def async_craw(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
print(f"{url} {len(result)}")

loop = asyncio.get_event_loop()

tasks = [
loop.create_task(async_craw(url))
for url in urls]

start_time = time.time()
loop.run_until_complete(asyncio.wait(tasks))
print(f"async craw cost {time.time() - start_time} seconds.")

0x09 在异步 IO 中使用信号量控制爬虫并发度

信号量 Semaphore

  • 信号量 (Semaphore) 又称为信号量, 旗语言
  • 是一个同步对象, 用于保持在 0 至最大值之间的一个计数器
  • 当线程完成一次对该 semaphore 对象的等待 (wait) 时, 该计数值减一
  • 当线程完成一次对 semaphore 对象的释放 (release) 时, 计数值减一
  • 当计数值为 0, 则线程等待该 semaphore 对象不再能成功直至该 semaphore 对象变成 signaled 状态
  • semaphore 对象的计数值大于 0, 为 signaled 状态, 计数值等于 0, 为 nonsignaled 状态

使用 Semaphore 模块控制并发度

  • 使用 sem = asyncio.Semaphore(10) 实例化一个信号量对象
  • 使用 async with sem: 语句块的代码都会遵循设置好的信号量运行
  • 或是使用 await sem.acquire() 锁定信号量, 再 try: 语句执行代码, 最后通过 finally: sem.release() 释放信号量限制

Reference:

上一页
2022-12-01 19:39:00
下一页