第四部分:asyncio 协程编程
协程(Coroutine)与 asyncio 库是 Python 异步编程的核心,提供了高效的 I/O 密集型任务处理方式。理解协程的本质和 asyncio 的工作原理是掌握异步编程的关键。
1、协程的本质认知
1.1 协程核心概念
1.1.1 协程的定义
协程是一种特殊的函数,可以在执行过程中暂停并从暂停处恢复执行。与线程的关键区别:
| 特性 | 协程 | 线程 |
|---|---|---|
| 调度方式 | 用户态,程序控制(协作式调度) | 操作系统控制(抢占式调度) |
| 运行模型 | 单线程 + 事件循环 | 多线程 |
| 切换开销 | 极小(无上下文切换) | 较大(需要保存/恢复线程状态) |
| 让出控制 | 主动让出(await关键字) | 操作系统抢占 |
| 并发能力 | 高并发(适合 I/O 密集型) | 有限并发(受线程数限制) |
| 资源消耗 | 轻量级(可创建成千上万个协程) | 重量级(线程数量受限) |
| 共享数据 | 无需锁(单线程内自然安全,除非有阻塞操作) | 需要锁(多线程竞争) |
1.1.2 协程工作机制
核心组件:
事件循环(Event Loop):协程调度的核心,负责管理和调度所有协程
- 维护就绪任务队列和等待任务队列
- 不断检查并执行就绪任务
- 监听 I/O 事件并唤醒相应协程
await 关键字:协程暂停和恢复的唯一机制
- 暂停当前协程执行
- 将控制权交还给事件循环
- 等待被 await 的对象完成
- 恢复协程执行并返回结果
协作式调度原理:
- 协程主动通过
await让出控制权,而非操作系统抢占 - 避免了锁竞争和死锁问题
- 要求开发者避免长时间运行而不让出控制权的代码
1.2 协程状态与生命周期
1.2.1 协程的五种状态
协程在其生命周期中经历以下状态:
- Created(创建):协程对象被创建但未执行
- Running(运行):协程正在执行
- Suspended(暂停):协程因
await暂停,等待 I/O 完成 - Completed(完成):协程正常结束
- Cancelled(取消):协程被外部取消
1.2.2 状态切换流程
Created → Running → Suspended → Running → Completed
↓
Cancelled详细切换过程:
- 协程开始执行 → Running
- 遇到
await→ Suspended,事件循环调度其他任务 - I/O 操作完成 → 重新进入就绪队列 → Running
- 协程函数结束 → Completed
1.3 协程的适用场景与限制
1.3.1 协程的性能优势
协程在以下场景表现出色:
- I/O 密集型任务:网络请求、文件读写、数据库操作
- 高并发场景:需要同时处理大量连接(如 Web 服务器)
- 异步 API 调用:多个 API 并发请求
1.3.2 协程的性能陷阱
协程并非总是更快,以下情况可能导致性能下降:
1. CPU 密集型任务:
- 协程运行在单线程,无法利用多核
- 持续占用 CPU,无法让出控制权
- 无 I/O 等待期间,协程并发优势消失
2. 过度的上下文切换:
- 频繁的
await调用增加调度开销 - 简单操作的
await开销可能超过收益
3. 不合理的并发设计:
- 按顺序
await每个任务,失去并发优势 - 应使用
asyncio.gather()等并发 API
1.3.3 阻塞操作的灾难
为什么time.sleep()会卡死 asyncio:
time.sleep()是同步阻塞操作,会阻塞整个线程- 在 asyncio 的单线程环境中,阻塞线程等于阻塞事件循环
- 事件循环被阻塞,所有协程都无法获得执行机会
正确的做法:
- 使用
await asyncio.sleep()代替time.sleep() - 使用异步库代替同步库(如
aiohttp代替requests) - 必须使用同步代码时,使用
asyncio.to_thread()或run_in_executor()
2、基础语法与运行模型
2.1 async/await 基础
2.1.1 async 定义协程函数
async def是定义协程函数的关键语法,它告诉 Python 解释器这个函数将返回一个协程对象。
协程函数的特性:
- 调用协程函数不会立即执行函数体,而是返回一个协程对象
- 协程对象需要通过事件循环来执行
- 协程对象表示一个可执行的协程实例
协程函数 vs 协程对象:
| 概念 | 定义 | 说明 |
|---|---|---|
| 协程函数 | 用async def定义的函数 | 是一个可调用的对象 |
| 协程对象 | 调用协程函数返回的对象 | 需要事件循环执行 |
import asyncio
async def simple_coroutine():
"""定义协程函数"""
print("协程开始执行")
await asyncio.sleep(1) # 模拟异步操作
print("协程执行完成")
return "协程结果"
# 调用协程函数返回协程对象,不执行
coro = simple_coroutine()
print(type(coro)) # <class 'coroutine'>
coro.close() # 显式关闭未执行的协程,避免RuntimeWarning
# 执行协程
result = asyncio.run(simple_coroutine())
print(result) # "协程结果"2.1.2 await 语义详解
await关键字的作用:
- 暂停当前协程的执行
- 将控制权交还给事件循环
- 等待 await 对象完成
- 恢复协程执行并返回结果
await 只能用于:
async def定义的函数内部- 等待可等待对象(Awaitable):协程、Task、Future
2.2 事件循环
2.2.1 Event Loop 的职责
事件循环是 asyncio 的核心组件,主要职责包括:
- 管理和调度协程任务:维护待执行、运行中和暂停的任务列表
- 监听 I/O 事件:使用高效的 I/O 多路复用机制(如 epoll、kqueue)
- 执行回调函数:管理定时器和 I/O 完成时的回调
- 处理定时器:管理任务的超时和延迟执行
2.2.2 任务队列管理
事件循环维护多个队列来管理不同类型的任务:
- 就绪队列:待执行的任务,已准备好运行或刚被唤醒
- 等待队列:等待 I/O 完成的任务,处于暂停状态
- 回调队列:等待执行的回调函数
调度策略:按照先进先出(FIFO)顺序从就绪队列中取出任务执行。
2.2.3 常用接口
import asyncio
async def demonstrate_loop_api():
# 获取当前运行的事件循环
loop = asyncio.get_running_loop()
print(f"当前循环: {loop}")
# 获取循环信息
print(f"循环是否运行: {loop.is_running()}")
# asyncio.run()内部自动创建和管理事件循环
asyncio.run(demonstrate_loop_api())2.2.4 事件循环工作流程示例
以下示例展示事件循环如何调度和管理多个协程任务:
import asyncio
import time
async def task_a():
"""任务A:模拟I/O操作"""
print(f"[{time.strftime('%H:%M:%S')}] 任务A开始执行")
await asyncio.sleep(0.5) # 模拟I/O等待,让出控制权
print(f"[{time.strftime('%H:%M:%S')}] 任务A恢复执行")
await asyncio.sleep(0.3)
print(f"[{time.strftime('%H:%M:%S')}] 任务A完成")
return "结果A"
async def task_b():
"""任务B:模拟I/O操作"""
print(f"[{time.strftime('%H:%M:%S')}] 任务B开始执行")
await asyncio.sleep(0.3) # 让出控制权
print(f"[{time.strftime('%H:%M:%S')}] 任务B恢复执行")
await asyncio.sleep(0.2)
print(f"[{time.strftime('%H:%M:%S')}] 任务B完成")
return "结果B"
async def task_c():
"""任务C:模拟I/O操作"""
print(f"[{time.strftime('%H:%M:%S')}] 任务C开始执行")
await asyncio.sleep(0.2) # 让出控制权
print(f"[{time.strftime('%H:%M:%S')}] 任务C恢复执行")
print(f"[{time.strftime('%H:%M:%S')}] 任务C完成")
return "结果C"
async def main():
"""演示事件循环的任务调度"""
print("=== 事件循环任务调度演示 ===\n")
start = time.time()
# 创建三个任务,它们会被加入事件循环的就绪队列
task1 = asyncio.create_task(task_a())
task2 = asyncio.create_task(task_b())
task3 = asyncio.create_task(task_c())
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
elapsed = time.time() - start
print(f"\n总耗时: {elapsed:.2f}秒")
print(f"任务结果: {results}")
asyncio.run(main())输出分析:
从输出可以看到:
- 三个任务几乎同时开始执行(被加入就绪队列)
- 当任务遇到
await asyncio.sleep()时,主动让出控制权 - 事件循环调度其他就绪任务执行
- I/O 完成后,任务重新进入就绪队列并恢复执行
- 总耗时约 0.8 秒(而非顺序执行的 1.5 秒),体现了并发优势
3、并发调度的核心 API
asyncio 提供了丰富的 API 来管理并发任务,是实现高效异步编程的关键。
3.1 创建并发任务
3.1.1 asyncio.create_task()
create_task()将协程封装为 Task 对象,立即加入事件循环准备执行。
Task 对象的特点:
- 提供对协程执行状态的控制能力
- 可以检查完成状态、取消执行等
- 立即调度到事件循环,与其他任务并发执行
import asyncio
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"结果-{name}"
async def main():
# create_task立即调度任务,不会阻塞
task1 = asyncio.create_task(task("A", 1))
task2 = asyncio.create_task(task("B", 1))
# 等待两个任务完成
results = await asyncio.gather(task1, task2)
print(f"所有结果: {results}")
asyncio.run(main())3.1.2 asyncio.gather()
gather()并发执行多个可等待对象,并收集它们的结果。这是最常用的并发执行方式。
gather 的特点:
- 将所有传入的可等待对象包装成任务并并发执行
- 等待所有任务完成后按顺序返回结果
- 支持
return_exceptions=True参数,收集异常而非立即传播 - 返回结果是一个列表,顺序与传入的顺序一致!
import asyncio
async def fetch_data(source, delay):
print(f"开始从{source}获取数据")
await asyncio.sleep(delay)
print(f"从{source}获取数据完成")
return f"{source}数据"
async def main():
# 并发执行多个任务,收集所有结果
results = await asyncio.gather(
fetch_data("服务器A", 1),
fetch_data("服务器B", 0.8),
fetch_data("服务器C", 1.2)
)
print(f"所有结果: {results}")
asyncio.run(main())3.1.3 asyncio.wait()
wait()提供更灵活的等待控制,可以等待特定条件。
wait 的灵活性:
FIRST_COMPLETED:等待第一个任务完成ALL_COMPLETED:等待所有任务完成(默认)FIRST_EXCEPTION:等待第一个异常
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return f"{name}完成"
async def main():
# 创建多个任务
tasks = [
asyncio.create_task(worker("任务1", 1)),
asyncio.create_task(worker("任务2", 2)),
]
# 等待第一个任务完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"第一个完成的任务: {done.pop().result()}")
# 取消剩余任务
for task in pending:
task.cancel()
asyncio.run(main())3.2 顺序 vs 并发 await
性能差异原理:
- 顺序等待:一个任务完成后才开始下一个,总耗时 = 各任务耗时之和
- 并发等待:多个任务同时运行,总耗时 ≈ 最长任务的耗时
import asyncio
import time
async def fetch_data(source, delay):
await asyncio.sleep(delay)
return f"{source}数据"
async def main():
# 顺序等待 - 总耗时约3秒
start = time.time()
await fetch_data("A", 1)
await fetch_data("B", 1.5)
await fetch_data("C", 0.5)
sequential_time = time.time() - start
print(f"顺序方式耗时: {sequential_time:.2f}s")
# 并发等待 - 总耗时约1.5秒(最长任务的时间)
start = time.time()
await asyncio.gather(
fetch_data("A", 1),
fetch_data("B", 1.5),
fetch_data("C", 0.5)
)
concurrent_time = time.time() - start
print(f"并发方式耗时: {concurrent_time:.2f}s")
print(f"性能提升: {sequential_time/concurrent_time:.2f}x")
asyncio.run(main())3.3 任务生命周期管理
Task 对象提供了一系列方法来管理任务的整个生命周期。
常用方法:
| 方法 | 描述 |
|---|---|
task.done() | 检查任务是否完成 |
task.cancelled() | 检查任务是否被取消 |
task.result() | 获取任务结果(任务完成后) |
task.cancel() | 取消任务 |
import asyncio
async def data_processing_task(task_id, duration, should_fail=False):
"""模拟数据处理任务"""
try:
print(f"任务{task_id}开始处理")
await asyncio.sleep(duration)
if should_fail:
raise ValueError(f"任务{task_id}处理失败")
print(f"任务{task_id}处理完成")
return f"任务{task_id}的结果"
except asyncio.CancelledError:
print(f"任务{task_id}被取消,正在清理资源...")
raise # 重新抛出CancelledError
async def monitor_tasks():
"""演示任务生命周期管理的各种方法"""
print("=== 任务生命周期管理演示 ===\n")
# 创建多个任务
task1 = asyncio.create_task(data_processing_task(1, 1.0))
task2 = asyncio.create_task(data_processing_task(2, 2.0))
task3 = asyncio.create_task(data_processing_task(3, 0.5, should_fail=True))
task4 = asyncio.create_task(data_processing_task(4, 3.0))
# 1. 检查任务是否完成 - done()
print(f"任务1是否完成: {task1.done()}") # False,刚创建
await asyncio.sleep(0.6) # 等待一段时间
# 2. 再次检查任务状态
print(f"\n等待0.6秒后:")
print(f"任务3是否完成: {task3.done()}") # True,已完成(失败)
# 3. 获取已完成任务的异常 - exception()
try:
exc = task3.exception()
print(f"任务3的异常: {exc}")
except Exception as e:
print(f"获取异常时出错: {e}")
# 4. 取消任务 - cancel()
print(f"\n取消任务4...")
task4.cancel()
print(f"任务4是否被取消: {task4.cancelled()}") # 可能还未完全取消
# 等待任务1和任务2完成
await asyncio.sleep(1.5)
# 5. 获取任务结果 - result()
if task1.done() and not task1.cancelled():
try:
result1 = task1.result()
print(f"\n任务1的结果: {result1}")
except Exception as e:
print(f"任务1异常: {e}")
if task2.done() and not task2.cancelled():
try:
result2 = task2.result()
print(f"任务2的结果: {result2}")
except Exception as e:
print(f"任务2异常: {e}")
# 6. 处理被取消的任务
try:
await task4
except asyncio.CancelledError:
print(f"\n任务4已被取消")
print(f"任务4是否被取消: {task4.cancelled()}") # True
# 7. 总结所有任务状态
print("\n=== 最终任务状态 ===")
for i, task in enumerate([task1, task2, task3, task4], 1):
status = "已完成" if task.done() else "运行中"
if task.cancelled():
status = "已取消"
elif task.done():
try:
task.result()
except Exception:
status = "异常结束"
print(f"任务{i}: {status}")
asyncio.run(monitor_tasks())方法说明:
| 方法 | 说明 | 返回值 |
|---|---|---|
task.done() | 检查任务是否完成(正常、异常或取消) | bool |
task.cancelled() | 检查任务是否被取消 | bool |
task.result() | 获取任务结果(仅在 done()为 True 时) | 任务返回值或抛出异常 |
task.exception() | 获取任务异常(仅在 done()为 True 且有异常时) | Exception 或 None |
task.cancel() | 请求取消任务 | bool(是否成功请求取消) |
4、异步 I/O 操作
异步 I/O 是协程的主要应用场景,允许程序在等待 I/O 操作时执行其他任务。
4.1 异步 I/O 工作原理
核心机制:
- 协程执行到异步 I/O 操作时,不阻塞线程
- 将 I/O 请求提交给操作系统
- 让出控制权给事件循环
- 事件循环调度其他任务执行
- I/O 操作完成时,协程自动恢复执行
4.2 常见异步 I/O 操作
异步 I/O 操作是协程的核心应用场景。以下是 Python 生态中常见的异步 I/O 操作类型:
1. 网络 I/O:
- HTTP 请求:
aiohttp- 异步 HTTP 客户端/服务器 - WebSocket:
websockets- 异步 WebSocket 通信 - TCP/UDP:
asyncio.open_connection()- 底层网络通信 - gRPC:
grpcio- 异步 RPC 调用
2. 文件 I/O:
- 文件读写:
aiofiles- 异步文件操作 - 文件监控:
watchfiles- 异步文件系统监控
3. 数据库 I/O:
- PostgreSQL:
asyncpg- 异步 PostgreSQL 驱动 - MySQL:
aiomysql- 异步 MySQL 驱动 - MongoDB:
motor- 异步 MongoDB 驱动 - Redis:
aioredis- 异步 Redis 客户端 - SQLAlchemy:
sqlalchemy[asyncio]- 异步 ORM
4. 消息队列:
- RabbitMQ:
aio-pika- 异步 AMQP 客户端 - Kafka:
aiokafka- 异步 Kafka 客户端
5. 其他 I/O:
- DNS 查询:
aiodns- 异步 DNS 解析 - SSH:
asyncssh- 异步 SSH 客户端 - 进程通信:
asyncio.create_subprocess_exec()- 异步子进程
4.2.1 异步文件操作详解
文件 I/O 是常见的阻塞操作,使用aiofiles可以实现真正的异步文件读写。
为什么需要异步文件操作:
- 标准的
open()、read()、write()会阻塞事件循环 - 大文件操作会导致其他协程无法执行
aiofiles将文件操作委托给线程池,避免阻塞
import asyncio
import aiofiles
import time
async def write_file_async(filename, content):
"""异步写入文件"""
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content)
print(f"异步写入完成: {filename}")
async def read_file_async(filename):
"""异步读取文件"""
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read()
print(f"异步读取完成: {filename}, 长度: {len(content)}")
return content
async def process_multiple_files():
"""并发处理多个文件"""
print("=== 异步文件操作演示 ===\n")
start = time.time()
# 准备测试数据
files_data = {
'file1.txt': '这是文件1的内容\n' * 1000,
'file2.txt': '这是文件2的内容\n' * 1000,
'file3.txt': '这是文件3的内容\n' * 1000,
}
# 并发写入多个文件
write_tasks = [
write_file_async(filename, content)
for filename, content in files_data.items()
]
await asyncio.gather(*write_tasks)
# 并发读取多个文件
read_tasks = [
read_file_async(filename)
for filename in files_data.keys()
]
contents = await asyncio.gather(*read_tasks)
elapsed = time.time() - start
print(f"\n总耗时: {elapsed:.2f}秒")
print(f"处理文件数: {len(files_data)}")
# 运行示例 (需要先安装: pip install aiofiles)
# asyncio.run(process_multiple_files())关键点:
- 使用
async with确保文件正确关闭 - 所有文件操作都使用
await - 多个文件可以并发处理,提高效率
- 适合处理大量小文件或多个大文件的场景
4.2.2 异步延迟
asyncio.sleep()是最基础的异步操作,用于模拟 I/O 延迟。与time.sleep()不同,它不会阻塞事件循环。
并发延迟示例:
import asyncio
import time
async def task_with_delay(name, delay):
"""带延迟的任务"""
print(f"[{time.strftime('%H:%M:%S')}] {name} 开始,延迟{delay}秒")
await asyncio.sleep(delay)
print(f"[{time.strftime('%H:%M:%S')}] {name} 完成")
return f"{name}结果"
async def concurrent_delays():
"""演示并发延迟"""
print("=== 并发延迟演示 ===\n")
start = time.time()
# 并发执行多个带延迟的任务
# 注意: task_with_delay内部使用了await asyncio.sleep(delay)
# 这会暂停当前协程,让出无法控制权给其他任务
results = await asyncio.gather(
task_with_delay("任务A", 2),
task_with_delay("任务B", 1),
task_with_delay("任务C", 1.5)
)
elapsed = time.time() - start
print(f"\n总耗时: {elapsed:.2f}秒 (并发执行)")
print(f"如果顺序执行需要: 4.5秒")
print(f"结果: {results}")
asyncio.run(concurrent_delays())关键点:
asyncio.sleep()让出控制权,允许其他协程执行- 多个延迟可以并发执行,总耗时取决于最长延迟
- 常用于实现重试逻辑、速率限制等场景
- 永远不要在协程中使用
time.sleep(),它会阻塞整个事件循环
4.2.3 第三方异步库
许多第三方库提供了异步接口,如:
- aiohttp:异步 HTTP 请求库
- asyncpg:异步 PostgreSQL 数据库驱动
- motor:异步 MongoDB 驱动
- aiofiles:异步文件操作
aiohttp 详解:
aiohttp是最流行的异步 HTTP 客户端库,支持客户端和服务器端功能。
基础用法:
# 需要先安装: pip install aiohttp
import asyncio
import aiohttp
async def fetch_single_url():
"""获取单个URL"""
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/json') as response:
# 检查响应状态
print(f"状态码: {response.status}")
# 获取JSON数据
data = await response.json()
print(f"获取到数据: {data}")
return data
# asyncio.run(fetch_single_url())并发请求示例:
import asyncio
import aiohttp
import time
async def fetch_url(session, url, url_id):
"""使用共享session获取URL"""
try:
print(f"开始请求 URL-{url_id}: {url}")
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
# 获取响应内容
data = await response.json()
print(f"完成请求 URL-{url_id}, 状态码: {response.status}")
return {
'url_id': url_id,
'status': response.status,
'data': data
}
except asyncio.TimeoutError:
print(f"URL-{url_id} 请求超时")
return {'url_id': url_id, 'error': 'timeout'}
except aiohttp.ClientError as e:
print(f"URL-{url_id} 请求失败: {e}")
return {'url_id': url_id, 'error': str(e)}
async def fetch_multiple_urls():
"""并发获取多个URL"""
print("=== aiohttp并发请求演示 ===\n")
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/json',
'https://httpbin.org/uuid',
]
start = time.time()
# 创建共享session(重要:复用连接,提高性能)
async with aiohttp.ClientSession() as session:
# 创建并发任务
tasks = [
fetch_url(session, url, i)
for i, url in enumerate(urls, 1)
]
# 并发执行所有请求
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.time() - start
# 统计结果
successful = sum(1 for r in results if isinstance(r, dict) and 'data' in r)
print(f"\n总耗时: {elapsed:.2f}秒")
print(f"成功: {successful}/{len(urls)}")
print(f"如果顺序请求大约需要: {sum([1, 2, 1, 1])}秒")
# asyncio.run(fetch_multiple_urls())关键点:
- 使用
ClientSession: 创建 session 并在多个请求间复用,提高性能 async with管理资源: 确保连接正确关闭- 并发请求: 使用
asyncio.gather()并发执行多个请求 - 错误处理: 捕获
ClientError和TimeoutError - 超时控制: 使用
ClientTimeout设置超时时间
5、协程与阻塞问题
在协程编程中,阻塞操作是最常见的性能陷阱。理解什么是阻塞、如何识别阻塞以及如何正确处理阻塞代码,是编写高效异步程序的关键。
5.1 阻塞操作的影响
在协程中使用阻塞操作会冻结整个事件循环,导致所有其他协程无法执行。
常见阻塞操作:
time.sleep():完全阻塞事件循环requests.get():网络请求阻塞- CPU 密集计算:长时间占用 CPU
- 文件同步操作:如
open().read()
5.2 识别"假异步库"
一些库看起来异步,但内部使用同步操作,同样会阻塞事件循环。
识别方法:
- 检查库的实现,看其内部是否使用了真正的异步 I/O
- 测试库的并发性能
- 查看库的文档说明
5.3 解决方案:在协程中运行同步代码
当必须使用同步代码(如使用了同步库或进行密集计算)时,不能直接调用,而应该将其放入线程池或进程池中运行,以避免阻塞事件循环。
5.3.1 方案一: asyncio.to_thread() (推荐, Python 3.9+)
这是处理I/O 密集型阻塞操作(如文件读写、requests 请求)的最简单的现代方法。它本质上是将任务提交给默认的线程池。
import asyncio
import time
import threading
def blocking_io():
"""模拟阻塞I/O操作"""
print(f"开始阻塞操作 (线程: {threading.current_thread().name})")
time.sleep(1) # 阻塞1秒
return "完成"
async def main():
print(f"主协程开始 (线程: {threading.current_thread().name})")
# 自动在单独的线程中运行,不阻塞主循环
result = await asyncio.to_thread(blocking_io)
print(f"结果: {result}")
# asyncio.run(main())5.3.2 方案二: loop.run_in_executor() (更灵活/CPU 密集型)
这是更底层的 API,允许你自定义执行器(Executor)。特别是对于CPU 密集型任务,必须使用ProcessPoolExecutor来规避 GIL 锁,实现真正的并行。
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound_task(n):
"""模拟CPU密集型计算"""
count = 0
for i in range(n):
count += i
return count
async def main():
loop = asyncio.get_running_loop()
# 1. 使用默认线程池 (等同于 to_thread)
# result1 = await loop.run_in_executor(None, blocking_io)
# 2. 使用进程池 (推荐用于 CPU 密集型)
# 注意: 创建进程池开销较大,通常复用
with ProcessPoolExecutor() as pool:
print("开始CPU密集型计算...")
result = await loop.run_in_executor(pool, cpu_bound_task, 10**7)
print(f"计算结果: {result}")
if __name__ == '__main__':
# Windows下使用多进程需要保护入口
asyncio.run(main())5.3.3 方案对比与选择指南
| 场景 | 推荐方案 | 原理 |
|---|---|---|
| I/O 密集型 (读写文件、同步网络请求) | asyncio.to_thread() | 使用线程池。I/O 操作期间释放 GIL,多线程可并发。 |
| CPU 密集型 (图像处理、复杂计算) | run_in_executor + ProcessPoolExecutor | 使用进程池。每个进程有独立的解释器和 GIL,实现真并行。 |
| 兼容旧版本 (Python < 3.9) | run_in_executor(None, ...) | 使用默认线程池运行。 |
6、异常、超时与取消
在异步编程中,异常处理、超时控制和任务取消是构建健壮应用程序的重要组成部分。
6.1 异常传播规则
在协程中,异常会按照特定的规则传播。当协程内部发生异常且未被捕获时,异常会向上层传播。
异常处理方式:
import asyncio
async def task_with_exception():
raise ValueError("任务中发生异常")
async def main():
try:
await task_with_exception()
except ValueError as e:
print(f"捕获异常: {e}")
asyncio.run(main())6.2 gather 的异常处理
asyncio.gather()在处理多个任务时有特定的异常处理行为。
两种异常处理策略:
return_exceptions=False(默认):第一个异常立即传播return_exceptions=True:所有结果(包括异常)作为结果返回
import asyncio
async def success_task():
await asyncio.sleep(0.1)
return "成功"
async def failure_task():
await asyncio.sleep(0.2)
raise ValueError("失败")
async def main():
# 使用return_exceptions=True收集异常
results = await asyncio.gather(
success_task(),
failure_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 异常: {result}")
else:
print(f"任务 {i} 结果: {result}")
asyncio.run(main())6.3 超时控制
超时控制是防止任务无限期等待的重要机制。
超时机制的重要性:
- 防止网络请求或 I/O 操作无响应
- 避免程序挂起
- 提高系统稳定性
import asyncio
async def slow_operation():
await asyncio.sleep(3)
return "慢操作完成"
async def main():
try:
# 使用超时控制
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(main())6.4 任务取消机制
任务取消是 asyncio 的重要功能,允许在运行时取消正在执行的任务。
取消工作原理:
- 调用任务的
cancel()方法 - 任务内部抛出
CancelledError异常 - 可以在协程中捕获此异常并进行清理工作
import asyncio
async def cancellable_task():
try:
print("任务开始")
await asyncio.sleep(2)
print("任务完成")
return "任务完成"
except asyncio.CancelledError:
print("任务被取消,进行清理...")
raise # 重新抛出CancelledError
async def main():
# 创建任务
task = asyncio.create_task(cancellable_task())
# 等待一小段时间后取消任务
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已取消")
asyncio.run(main())7、同步原语
在并发编程中,同步原语用于协调多个协程对共享资源的访问,避免竞态条件。
7.1 异步锁(asyncio.Lock)
异步锁用于保护临界区,确保同一时间只有一个协程可以访问受保护的代码段。
异步锁的特点:
- 与线程锁类似,但与 asyncio 事件循环兼容
- 不会阻塞事件循环
- 保证原子操作
import asyncio
# 共享资源
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
async with lock: # 保证原子操作
temp = counter
await asyncio.sleep(0.1) # 模拟处理时间
counter = temp + 1
print(f"计数器: {counter}")
async def main():
# 并发执行多个increment操作
await asyncio.gather(increment(), increment(), increment())
print(f"最终计数: {counter}")
asyncio.run(main())7.2 异步信号量(asyncio.Semaphore)
异步信号量用于限制同时访问特定资源的协程数量。
应用场景:
- 限制同时进行的网络连接数
- 控制数据库连接池大小
- 限制 API 请求频率
import asyncio
async def access_resource(name, semaphore):
async with semaphore: # 限制并发数
print(f"{name} 获得资源访问权")
await asyncio.sleep(1) # 模拟资源使用时间
print(f"{name} 释放资源")
async def main():
# 限制最多2个协程同时访问资源
semaphore = asyncio.Semaphore(2)
tasks = [
access_resource(f"任务-{i}", semaphore)
for i in range(5)
]
await asyncio.gather(*tasks)
asyncio.run(main())7.3 异步队列(asyncio.Queue)
异步队列是协程间通信的重要工具,特别适用于生产者-消费者模式。
队列的异步特性:
- 线程安全
- 支持异步的 put 和 get 操作
- 队列满时 put 会等待,队列空时 get 会等待
- 这些操作都不会阻塞事件循环
import asyncio
async def producer(queue):
"""生产者:向队列添加数据"""
for i in range(5):
item = f"Item-{i}"
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(0.5)
async def consumer(queue, name):
"""消费者:从队列取出数据"""
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=2.0)
print(f"{name} 消费: {item}")
queue.task_done()
except asyncio.TimeoutError:
break
async def main():
queue = asyncio.Queue()
# 使用队列
await asyncio.gather(
producer(queue),
consumer(queue, "消费者-1"),
consumer(queue, "消费者-2")
)
asyncio.run(main())8、结构化并发
结构化并发(Structured Concurrency)是一种编程范式,它确保并发任务的生命周期得到正确管理,避免任务泄漏和异常处理问题。
8.1 结构化并发的核心概念
结构化并发(Structured Concurrency)是一种编程范式,旨在让并发代码像顺序代码一样清晰、可预测、可维护。它强调:子任务的生命周期必须被其父任务显式管理,避免"孤儿任务"(即无主、无法追踪的并发任务)。
为什么需要结构化并发?
传统并发(如线程、裸协程)常见问题:
- 任务无法追踪:任务启动后,父任务无法确认其完成或取消状态
- 异常丢失:子任务的异常无法自动传播回父上下文
- 资源泄漏:程序退出时可能残留后台任务,导致连接或文件句柄未释放
核心原则:
- 作用域绑定: 子任务必须在父任务的作用域内启动和完成,不能"逃逸"到外部
- 生命周期管理: 父任务结束时,自动取消所有未完成的子任务
- 异常传播: 任一子任务出错,应取消其余子任务并向上抛出
- 栈式结构: 作用域是栈式(stack-like)的,符合"先进后出"的控制流
8.1.1 反面教材:非结构化并发
import asyncio
async def child_task():
print("子任务开始")
await asyncio.sleep(1)
print("子任务结束")
async def bad_example():
print("父任务开始")
# 启动任务但不等待 -> 成为孤儿!
asyncio.create_task(child_task())
print("父任务结束")
# 程序可以在此退出,child_task 被静默丢弃
# 导致:
# 1. "子任务结束" 永远不会被打印
# 2. 如果 child_task 中有资源清理代码,将不会执行
# asyncio.run(bad_example())8.2 Structured Concurrency 实践 (TaskGroup)
Python 3.11+ 引入了 asyncio.TaskGroup(受 Trio 启发),提供了标准的结构化并发支持。
8.2.1 使用 TaskGroup
import asyncio
import time
async def child_task(id, delay):
print(f"子任务 {id} 启动")
await asyncio.sleep(delay)
print(f"子任务 {id} 完成")
return id
async def good_example():
print("=== 结构化并发示例 ===")
start = time.time()
try:
# async with 创建一个任务作用域
async with asyncio.TaskGroup() as tg:
# 在作用域内创建任务
task1 = tg.create_task(child_task(1, 1))
task2 = tg.create_task(child_task(2, 2))
print("父任务已启动所有子任务")
# 离开 async with 块时:
# 1. 自动挂起父任务,等待所有子任务完成
# 2. 如果发生异常,自动取消其他子任务
except Exception as e:
print(f"捕获异常: {e}")
print(f"所有任务完成, 总耗时: {time.time() - start:.2f}秒")
# asyncio.run(good_example())8.2.2 关键机制解析
| 机制 | 说明 |
|---|---|
| TaskGroup | 提供结构化作用域,管理子任务生命周期 |
| 自动等待 (Join) | 离开作用域时自动 await 所有子任务 |
| 自动取消 (Cancel) | 出现异常或提前退出时,自动取消其余任务 |
| 异常聚合 | 多个异常可被收集 (Python 3.11+ 支持 ExceptionGroup) |
9、什么时候不要用协程
尽管协程在 I/O 密集型任务中表现出色,但并不适合所有场景。
9.1 不适合协程的场景
1. CPU 密集型算法:
- 数值计算、图像处理、加密解密
- 需要持续占用 CPU,无法利用异步的优势
2. 实时控制/硬实时:
- 需要精确时间控制的场景
- 协程的调度可能导致时间不可预测
3. 需要多核并行:
- 真正需要并行计算的场景
- 协程在单线程中运行无法实现真正的并行
9.2 替代方案
对于 CPU 密集型任务,使用多进程是更好的选择:
import multiprocessing
def cpu_intensive_task(data):
"""CPU密集型任务"""
result = 0
for i in range(data * 1000000):
result += i * i
return result
if __name__ == '__main__':
data_list = [10, 20, 30]
with multiprocessing.Pool() as pool:
results = pool.map(cpu_intensive_task, data_list)
print(f"多进程处理结果: {results}")场景选择指南:
| 场景类型 | 推荐方案 | 原因 |
|---|---|---|
| I/O 密集型 | 协程 | 高并发,轻量级 |
| CPU 密集型 | 多进程 | 利用多核,真正并行 |
| 混合型 | 协程+多进程 | 协程处理 I/O,进程处理 CPU |
| 大量短任务 | 协程 | 创建销毁开销小 |
| 实时性要求高 | 多线程 | 更可预测的调度 |
总结
协程和 asyncio 是处理 I/O 密集型任务的强大工具,关键要点:
核心概念
- 理解本质:协程是协作式多任务,不是线程,运行在单线程+事件循环模型上
- 掌握机制:
await是唯一让出点,事件循环负责调度 - 状态管理:理解协程的 5 种状态及其切换过程
最佳实践
- 避免阻塞:永远不要在协程中使用阻塞操作(
time.sleep()、requests等) - 合理并发:使用
asyncio.gather()等并发 API,而非顺序 await - 正确处理同步代码:使用
asyncio.to_thread()或run_in_executor() - 异常处理:正确处理异常、超时和取消
- 资源控制:使用信号量等同步原语控制并发度
适用场景
- 推荐使用:I/O 密集型任务(网络请求、文件读写、数据库操作)
- 不推荐使用:CPU 密集型任务(数值计算、图像处理等)
- 替代方案:CPU 密集型任务使用多进程
关键记忆
| 概念 | 要点 |
|---|---|
| 协程 vs 线程 | 协作式调度 vs 抢占式调度 |
| 事件循环 | 协程调度的核心,维护任务队列 |
| await | 暂停协程,让出控制权的唯一机制 |
| 并发 API | gather/wait/create_task |
| 同步原语 | Lock/Semaphore/Queue |
| 适用场景 | I/O 密集型任务,非 CPU 密集型 |
| 性能陷阱 | 阻塞操作、CPU 密集型、顺序 await |