Skip to content

第四部分:asyncio 协程编程

协程(Coroutine)与 asyncio 库是 Python 异步编程的核心,提供了高效的 I/O 密集型任务处理方式。理解协程的本质和 asyncio 的工作原理是掌握异步编程的关键。

1、协程的本质认知

1.1 协程核心概念

1.1.1 协程的定义

协程是一种特殊的函数,可以在执行过程中暂停并从暂停处恢复执行。与线程的关键区别:

特性协程线程
调度方式用户态,程序控制(协作式调度)操作系统控制(抢占式调度)
运行模型单线程 + 事件循环多线程
切换开销极小(无上下文切换)较大(需要保存/恢复线程状态)
让出控制主动让出(await关键字)操作系统抢占
并发能力高并发(适合 I/O 密集型)有限并发(受线程数限制)
资源消耗轻量级(可创建成千上万个协程)重量级(线程数量受限)
共享数据无需锁(单线程内自然安全,除非有阻塞操作)需要锁(多线程竞争)

1.1.2 协程工作机制

核心组件:

  1. 事件循环(Event Loop):协程调度的核心,负责管理和调度所有协程

    • 维护就绪任务队列和等待任务队列
    • 不断检查并执行就绪任务
    • 监听 I/O 事件并唤醒相应协程
  2. await 关键字:协程暂停和恢复的唯一机制

    • 暂停当前协程执行
    • 将控制权交还给事件循环
    • 等待被 await 的对象完成
    • 恢复协程执行并返回结果

协作式调度原理:

  • 协程主动通过await让出控制权,而非操作系统抢占
  • 避免了锁竞争和死锁问题
  • 要求开发者避免长时间运行而不让出控制权的代码

1.2 协程状态与生命周期

1.2.1 协程的五种状态

协程在其生命周期中经历以下状态:

  1. Created(创建):协程对象被创建但未执行
  2. Running(运行):协程正在执行
  3. Suspended(暂停):协程因await暂停,等待 I/O 完成
  4. Completed(完成):协程正常结束
  5. Cancelled(取消):协程被外部取消

1.2.2 状态切换流程

Created → Running → Suspended → Running → Completed

                    Cancelled

详细切换过程:

  1. 协程开始执行 → Running
  2. 遇到awaitSuspended,事件循环调度其他任务
  3. I/O 操作完成 → 重新进入就绪队列 → Running
  4. 协程函数结束 → Completed

1.3 协程的适用场景与限制

1.3.1 协程的性能优势

协程在以下场景表现出色:

  • I/O 密集型任务:网络请求、文件读写、数据库操作
  • 高并发场景:需要同时处理大量连接(如 Web 服务器)
  • 异步 API 调用:多个 API 并发请求

1.3.2 协程的性能陷阱

协程并非总是更快,以下情况可能导致性能下降:

1. CPU 密集型任务:

  • 协程运行在单线程,无法利用多核
  • 持续占用 CPU,无法让出控制权
  • 无 I/O 等待期间,协程并发优势消失

2. 过度的上下文切换:

  • 频繁的await调用增加调度开销
  • 简单操作的await开销可能超过收益

3. 不合理的并发设计:

  • 按顺序await每个任务,失去并发优势
  • 应使用asyncio.gather()等并发 API

1.3.3 阻塞操作的灾难

为什么time.sleep()会卡死 asyncio:

  • time.sleep()是同步阻塞操作,会阻塞整个线程
  • 在 asyncio 的单线程环境中,阻塞线程等于阻塞事件循环
  • 事件循环被阻塞,所有协程都无法获得执行机会

正确的做法:

  • 使用await asyncio.sleep()代替time.sleep()
  • 使用异步库代替同步库(如aiohttp代替requests)
  • 必须使用同步代码时,使用asyncio.to_thread()run_in_executor()

2、基础语法与运行模型

2.1 async/await 基础

2.1.1 async 定义协程函数

async def是定义协程函数的关键语法,它告诉 Python 解释器这个函数将返回一个协程对象。

协程函数的特性:

  • 调用协程函数不会立即执行函数体,而是返回一个协程对象
  • 协程对象需要通过事件循环来执行
  • 协程对象表示一个可执行的协程实例

协程函数 vs 协程对象:

概念定义说明
协程函数async def定义的函数是一个可调用的对象
协程对象调用协程函数返回的对象需要事件循环执行
python
import asyncio

async def simple_coroutine():
    """定义协程函数"""
    print("协程开始执行")
    await asyncio.sleep(1)  # 模拟异步操作
    print("协程执行完成")
    return "协程结果"

# 调用协程函数返回协程对象,不执行
coro = simple_coroutine()
print(type(coro))  # <class 'coroutine'>
coro.close()  # 显式关闭未执行的协程,避免RuntimeWarning

# 执行协程
result = asyncio.run(simple_coroutine())
print(result)  # "协程结果"

2.1.2 await 语义详解

await关键字的作用:

  1. 暂停当前协程的执行
  2. 将控制权交还给事件循环
  3. 等待 await 对象完成
  4. 恢复协程执行并返回结果

await 只能用于:

  • async def定义的函数内部
  • 等待可等待对象(Awaitable):协程、Task、Future

2.2 事件循环

2.2.1 Event Loop 的职责

事件循环是 asyncio 的核心组件,主要职责包括:

  1. 管理和调度协程任务:维护待执行、运行中和暂停的任务列表
  2. 监听 I/O 事件:使用高效的 I/O 多路复用机制(如 epoll、kqueue)
  3. 执行回调函数:管理定时器和 I/O 完成时的回调
  4. 处理定时器:管理任务的超时和延迟执行

2.2.2 任务队列管理

事件循环维护多个队列来管理不同类型的任务:

  • 就绪队列:待执行的任务,已准备好运行或刚被唤醒
  • 等待队列:等待 I/O 完成的任务,处于暂停状态
  • 回调队列:等待执行的回调函数

调度策略:按照先进先出(FIFO)顺序从就绪队列中取出任务执行。

2.2.3 常用接口

python
import asyncio

async def demonstrate_loop_api():
    # 获取当前运行的事件循环
    loop = asyncio.get_running_loop()
    print(f"当前循环: {loop}")

    # 获取循环信息
    print(f"循环是否运行: {loop.is_running()}")

# asyncio.run()内部自动创建和管理事件循环
asyncio.run(demonstrate_loop_api())

2.2.4 事件循环工作流程示例

以下示例展示事件循环如何调度和管理多个协程任务:

python
import asyncio
import time

async def task_a():
    """任务A:模拟I/O操作"""
    print(f"[{time.strftime('%H:%M:%S')}] 任务A开始执行")
    await asyncio.sleep(0.5)  # 模拟I/O等待,让出控制权
    print(f"[{time.strftime('%H:%M:%S')}] 任务A恢复执行")
    await asyncio.sleep(0.3)
    print(f"[{time.strftime('%H:%M:%S')}] 任务A完成")
    return "结果A"

async def task_b():
    """任务B:模拟I/O操作"""
    print(f"[{time.strftime('%H:%M:%S')}] 任务B开始执行")
    await asyncio.sleep(0.3)  # 让出控制权
    print(f"[{time.strftime('%H:%M:%S')}] 任务B恢复执行")
    await asyncio.sleep(0.2)
    print(f"[{time.strftime('%H:%M:%S')}] 任务B完成")
    return "结果B"

async def task_c():
    """任务C:模拟I/O操作"""
    print(f"[{time.strftime('%H:%M:%S')}] 任务C开始执行")
    await asyncio.sleep(0.2)  # 让出控制权
    print(f"[{time.strftime('%H:%M:%S')}] 任务C恢复执行")
    print(f"[{time.strftime('%H:%M:%S')}] 任务C完成")
    return "结果C"

async def main():
    """演示事件循环的任务调度"""
    print("=== 事件循环任务调度演示 ===\n")
    start = time.time()

    # 创建三个任务,它们会被加入事件循环的就绪队列
    task1 = asyncio.create_task(task_a())
    task2 = asyncio.create_task(task_b())
    task3 = asyncio.create_task(task_c())

    # 等待所有任务完成
    results = await asyncio.gather(task1, task2, task3)

    elapsed = time.time() - start
    print(f"\n总耗时: {elapsed:.2f}秒")
    print(f"任务结果: {results}")

asyncio.run(main())

输出分析:

从输出可以看到:

  1. 三个任务几乎同时开始执行(被加入就绪队列)
  2. 当任务遇到await asyncio.sleep()时,主动让出控制权
  3. 事件循环调度其他就绪任务执行
  4. I/O 完成后,任务重新进入就绪队列并恢复执行
  5. 总耗时约 0.8 秒(而非顺序执行的 1.5 秒),体现了并发优势

3、并发调度的核心 API

asyncio 提供了丰富的 API 来管理并发任务,是实现高效异步编程的关键。

3.1 创建并发任务

3.1.1 asyncio.create_task()

create_task()将协程封装为 Task 对象,立即加入事件循环准备执行。

Task 对象的特点:

  • 提供对协程执行状态的控制能力
  • 可以检查完成状态、取消执行等
  • 立即调度到事件循环,与其他任务并发执行
python
import asyncio

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"结果-{name}"

async def main():
    # create_task立即调度任务,不会阻塞
    task1 = asyncio.create_task(task("A", 1))
    task2 = asyncio.create_task(task("B", 1))

    # 等待两个任务完成
    results = await asyncio.gather(task1, task2)
    print(f"所有结果: {results}")

asyncio.run(main())

3.1.2 asyncio.gather()

gather()并发执行多个可等待对象,并收集它们的结果。这是最常用的并发执行方式。

gather 的特点:

  • 将所有传入的可等待对象包装成任务并并发执行
  • 等待所有任务完成后按顺序返回结果
  • 支持return_exceptions=True参数,收集异常而非立即传播
  • 返回结果是一个列表,顺序与传入的顺序一致!
python
import asyncio

async def fetch_data(source, delay):
    print(f"开始从{source}获取数据")
    await asyncio.sleep(delay)
    print(f"从{source}获取数据完成")
    return f"{source}数据"

async def main():
    # 并发执行多个任务,收集所有结果
    results = await asyncio.gather(
        fetch_data("服务器A", 1),
        fetch_data("服务器B", 0.8),
        fetch_data("服务器C", 1.2)
    )
    print(f"所有结果: {results}")

asyncio.run(main())

3.1.3 asyncio.wait()

wait()提供更灵活的等待控制,可以等待特定条件。

wait 的灵活性:

  • FIRST_COMPLETED:等待第一个任务完成
  • ALL_COMPLETED:等待所有任务完成(默认)
  • FIRST_EXCEPTION:等待第一个异常
python
import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    return f"{name}完成"

async def main():
    # 创建多个任务
    tasks = [
        asyncio.create_task(worker("任务1", 1)),
        asyncio.create_task(worker("任务2", 2)),
    ]

    # 等待第一个任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"第一个完成的任务: {done.pop().result()}")

    # 取消剩余任务
    for task in pending:
        task.cancel()

asyncio.run(main())

3.2 顺序 vs 并发 await

性能差异原理:

  • 顺序等待:一个任务完成后才开始下一个,总耗时 = 各任务耗时之和
  • 并发等待:多个任务同时运行,总耗时 ≈ 最长任务的耗时
python
import asyncio
import time

async def fetch_data(source, delay):
    await asyncio.sleep(delay)
    return f"{source}数据"

async def main():
    # 顺序等待 - 总耗时约3秒
    start = time.time()
    await fetch_data("A", 1)
    await fetch_data("B", 1.5)
    await fetch_data("C", 0.5)
    sequential_time = time.time() - start
    print(f"顺序方式耗时: {sequential_time:.2f}s")

    # 并发等待 - 总耗时约1.5秒(最长任务的时间)
    start = time.time()
    await asyncio.gather(
        fetch_data("A", 1),
        fetch_data("B", 1.5),
        fetch_data("C", 0.5)
    )
    concurrent_time = time.time() - start
    print(f"并发方式耗时: {concurrent_time:.2f}s")
    print(f"性能提升: {sequential_time/concurrent_time:.2f}x")

asyncio.run(main())

3.3 任务生命周期管理

Task 对象提供了一系列方法来管理任务的整个生命周期。

常用方法:

方法描述
task.done()检查任务是否完成
task.cancelled()检查任务是否被取消
task.result()获取任务结果(任务完成后)
task.cancel()取消任务
python
import asyncio

async def data_processing_task(task_id, duration, should_fail=False):
    """模拟数据处理任务"""
    try:
        print(f"任务{task_id}开始处理")
        await asyncio.sleep(duration)

        if should_fail:
            raise ValueError(f"任务{task_id}处理失败")

        print(f"任务{task_id}处理完成")
        return f"任务{task_id}的结果"
    except asyncio.CancelledError:
        print(f"任务{task_id}被取消,正在清理资源...")
        raise  # 重新抛出CancelledError

async def monitor_tasks():
    """演示任务生命周期管理的各种方法"""
    print("=== 任务生命周期管理演示 ===\n")

    # 创建多个任务
    task1 = asyncio.create_task(data_processing_task(1, 1.0))
    task2 = asyncio.create_task(data_processing_task(2, 2.0))
    task3 = asyncio.create_task(data_processing_task(3, 0.5, should_fail=True))
    task4 = asyncio.create_task(data_processing_task(4, 3.0))

    # 1. 检查任务是否完成 - done()
    print(f"任务1是否完成: {task1.done()}")  # False,刚创建

    await asyncio.sleep(0.6)  # 等待一段时间

    # 2. 再次检查任务状态
    print(f"\n等待0.6秒后:")
    print(f"任务3是否完成: {task3.done()}")  # True,已完成(失败)

    # 3. 获取已完成任务的异常 - exception()
    try:
        exc = task3.exception()
        print(f"任务3的异常: {exc}")
    except Exception as e:
        print(f"获取异常时出错: {e}")

    # 4. 取消任务 - cancel()
    print(f"\n取消任务4...")
    task4.cancel()
    print(f"任务4是否被取消: {task4.cancelled()}")  # 可能还未完全取消

    # 等待任务1和任务2完成
    await asyncio.sleep(1.5)

    # 5. 获取任务结果 - result()
    if task1.done() and not task1.cancelled():
        try:
            result1 = task1.result()
            print(f"\n任务1的结果: {result1}")
        except Exception as e:
            print(f"任务1异常: {e}")

    if task2.done() and not task2.cancelled():
        try:
            result2 = task2.result()
            print(f"任务2的结果: {result2}")
        except Exception as e:
            print(f"任务2异常: {e}")

    # 6. 处理被取消的任务
    try:
        await task4
    except asyncio.CancelledError:
        print(f"\n任务4已被取消")
        print(f"任务4是否被取消: {task4.cancelled()}")  # True

    # 7. 总结所有任务状态
    print("\n=== 最终任务状态 ===")
    for i, task in enumerate([task1, task2, task3, task4], 1):
        status = "已完成" if task.done() else "运行中"
        if task.cancelled():
            status = "已取消"
        elif task.done():
            try:
                task.result()
            except Exception:
                status = "异常结束"
        print(f"任务{i}: {status}")

asyncio.run(monitor_tasks())

方法说明:

方法说明返回值
task.done()检查任务是否完成(正常、异常或取消)bool
task.cancelled()检查任务是否被取消bool
task.result()获取任务结果(仅在 done()为 True 时)任务返回值或抛出异常
task.exception()获取任务异常(仅在 done()为 True 且有异常时)Exception 或 None
task.cancel()请求取消任务bool(是否成功请求取消)

4、异步 I/O 操作

异步 I/O 是协程的主要应用场景,允许程序在等待 I/O 操作时执行其他任务。

4.1 异步 I/O 工作原理

核心机制:

  1. 协程执行到异步 I/O 操作时,不阻塞线程
  2. 将 I/O 请求提交给操作系统
  3. 让出控制权给事件循环
  4. 事件循环调度其他任务执行
  5. I/O 操作完成时,协程自动恢复执行

4.2 常见异步 I/O 操作

异步 I/O 操作是协程的核心应用场景。以下是 Python 生态中常见的异步 I/O 操作类型:

1. 网络 I/O:

  • HTTP 请求: aiohttp - 异步 HTTP 客户端/服务器
  • WebSocket: websockets - 异步 WebSocket 通信
  • TCP/UDP: asyncio.open_connection() - 底层网络通信
  • gRPC: grpcio - 异步 RPC 调用

2. 文件 I/O:

  • 文件读写: aiofiles - 异步文件操作
  • 文件监控: watchfiles - 异步文件系统监控

3. 数据库 I/O:

  • PostgreSQL: asyncpg - 异步 PostgreSQL 驱动
  • MySQL: aiomysql - 异步 MySQL 驱动
  • MongoDB: motor - 异步 MongoDB 驱动
  • Redis: aioredis - 异步 Redis 客户端
  • SQLAlchemy: sqlalchemy[asyncio] - 异步 ORM

4. 消息队列:

  • RabbitMQ: aio-pika - 异步 AMQP 客户端
  • Kafka: aiokafka - 异步 Kafka 客户端

5. 其他 I/O:

  • DNS 查询: aiodns - 异步 DNS 解析
  • SSH: asyncssh - 异步 SSH 客户端
  • 进程通信: asyncio.create_subprocess_exec() - 异步子进程

4.2.1 异步文件操作详解

文件 I/O 是常见的阻塞操作,使用aiofiles可以实现真正的异步文件读写。

为什么需要异步文件操作:

  • 标准的open()read()write()会阻塞事件循环
  • 大文件操作会导致其他协程无法执行
  • aiofiles将文件操作委托给线程池,避免阻塞
python
import asyncio
import aiofiles
import time

async def write_file_async(filename, content):
    """异步写入文件"""
    async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
        await f.write(content)
        print(f"异步写入完成: {filename}")

async def read_file_async(filename):
    """异步读取文件"""
    async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
        content = await f.read()
        print(f"异步读取完成: {filename}, 长度: {len(content)}")
        return content

async def process_multiple_files():
    """并发处理多个文件"""
    print("=== 异步文件操作演示 ===\n")
    start = time.time()

    # 准备测试数据
    files_data = {
        'file1.txt': '这是文件1的内容\n' * 1000,
        'file2.txt': '这是文件2的内容\n' * 1000,
        'file3.txt': '这是文件3的内容\n' * 1000,
    }

    # 并发写入多个文件
    write_tasks = [
        write_file_async(filename, content)
        for filename, content in files_data.items()
    ]
    await asyncio.gather(*write_tasks)

    # 并发读取多个文件
    read_tasks = [
        read_file_async(filename)
        for filename in files_data.keys()
    ]
    contents = await asyncio.gather(*read_tasks)

    elapsed = time.time() - start
    print(f"\n总耗时: {elapsed:.2f}秒")
    print(f"处理文件数: {len(files_data)}")

# 运行示例 (需要先安装: pip install aiofiles)
# asyncio.run(process_multiple_files())

关键点:

  1. 使用async with确保文件正确关闭
  2. 所有文件操作都使用await
  3. 多个文件可以并发处理,提高效率
  4. 适合处理大量小文件或多个大文件的场景

4.2.2 异步延迟

asyncio.sleep()是最基础的异步操作,用于模拟 I/O 延迟。与time.sleep()不同,它不会阻塞事件循环。

并发延迟示例:

python
import asyncio
import time

async def task_with_delay(name, delay):
    """带延迟的任务"""
    print(f"[{time.strftime('%H:%M:%S')}] {name} 开始,延迟{delay}秒")
    await asyncio.sleep(delay)
    print(f"[{time.strftime('%H:%M:%S')}] {name} 完成")
    return f"{name}结果"

async def concurrent_delays():
    """演示并发延迟"""
    print("=== 并发延迟演示 ===\n")
    start = time.time()

    # 并发执行多个带延迟的任务
    # 注意: task_with_delay内部使用了await asyncio.sleep(delay)
    # 这会暂停当前协程,让出无法控制权给其他任务
    results = await asyncio.gather(
        task_with_delay("任务A", 2),
        task_with_delay("任务B", 1),
        task_with_delay("任务C", 1.5)
    )

    elapsed = time.time() - start
    print(f"\n总耗时: {elapsed:.2f}秒 (并发执行)")
    print(f"如果顺序执行需要: 4.5秒")
    print(f"结果: {results}")

asyncio.run(concurrent_delays())

关键点:

  1. asyncio.sleep()让出控制权,允许其他协程执行
  2. 多个延迟可以并发执行,总耗时取决于最长延迟
  3. 常用于实现重试逻辑、速率限制等场景
  4. 永远不要在协程中使用time.sleep(),它会阻塞整个事件循环

4.2.3 第三方异步库

许多第三方库提供了异步接口,如:

  • aiohttp:异步 HTTP 请求库
  • asyncpg:异步 PostgreSQL 数据库驱动
  • motor:异步 MongoDB 驱动
  • aiofiles:异步文件操作

aiohttp 详解:

aiohttp是最流行的异步 HTTP 客户端库,支持客户端和服务器端功能。

基础用法:

python
# 需要先安装: pip install aiohttp
import asyncio
import aiohttp

async def fetch_single_url():
    """获取单个URL"""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://httpbin.org/json') as response:
            # 检查响应状态
            print(f"状态码: {response.status}")

            # 获取JSON数据
            data = await response.json()
            print(f"获取到数据: {data}")
            return data

# asyncio.run(fetch_single_url())

并发请求示例:

python
import asyncio
import aiohttp
import time

async def fetch_url(session, url, url_id):
    """使用共享session获取URL"""
    try:
        print(f"开始请求 URL-{url_id}: {url}")
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            # 获取响应内容
            data = await response.json()
            print(f"完成请求 URL-{url_id}, 状态码: {response.status}")
            return {
                'url_id': url_id,
                'status': response.status,
                'data': data
            }
    except asyncio.TimeoutError:
        print(f"URL-{url_id} 请求超时")
        return {'url_id': url_id, 'error': 'timeout'}
    except aiohttp.ClientError as e:
        print(f"URL-{url_id} 请求失败: {e}")
        return {'url_id': url_id, 'error': str(e)}

async def fetch_multiple_urls():
    """并发获取多个URL"""
    print("=== aiohttp并发请求演示 ===\n")

    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/json',
        'https://httpbin.org/uuid',
    ]

    start = time.time()

    # 创建共享session(重要:复用连接,提高性能)
    async with aiohttp.ClientSession() as session:
        # 创建并发任务
        tasks = [
            fetch_url(session, url, i)
            for i, url in enumerate(urls, 1)
        ]

        # 并发执行所有请求
        results = await asyncio.gather(*tasks, return_exceptions=True)

    elapsed = time.time() - start

    # 统计结果
    successful = sum(1 for r in results if isinstance(r, dict) and 'data' in r)
    print(f"\n总耗时: {elapsed:.2f}秒")
    print(f"成功: {successful}/{len(urls)}")
    print(f"如果顺序请求大约需要: {sum([1, 2, 1, 1])}秒")

# asyncio.run(fetch_multiple_urls())

关键点:

  1. 使用ClientSession: 创建 session 并在多个请求间复用,提高性能
  2. async with管理资源: 确保连接正确关闭
  3. 并发请求: 使用asyncio.gather()并发执行多个请求
  4. 错误处理: 捕获ClientErrorTimeoutError
  5. 超时控制: 使用ClientTimeout设置超时时间

5、协程与阻塞问题

在协程编程中,阻塞操作是最常见的性能陷阱。理解什么是阻塞、如何识别阻塞以及如何正确处理阻塞代码,是编写高效异步程序的关键。

5.1 阻塞操作的影响

在协程中使用阻塞操作会冻结整个事件循环,导致所有其他协程无法执行。

常见阻塞操作:

  1. time.sleep():完全阻塞事件循环
  2. requests.get():网络请求阻塞
  3. CPU 密集计算:长时间占用 CPU
  4. 文件同步操作:如open().read()

5.2 识别"假异步库"

一些库看起来异步,但内部使用同步操作,同样会阻塞事件循环。

识别方法:

  • 检查库的实现,看其内部是否使用了真正的异步 I/O
  • 测试库的并发性能
  • 查看库的文档说明

5.3 解决方案:在协程中运行同步代码

当必须使用同步代码(如使用了同步库或进行密集计算)时,不能直接调用,而应该将其放入线程池进程池中运行,以避免阻塞事件循环。

5.3.1 方案一: asyncio.to_thread() (推荐, Python 3.9+)

这是处理I/O 密集型阻塞操作(如文件读写、requests 请求)的最简单的现代方法。它本质上是将任务提交给默认的线程池。

python
import asyncio
import time
import threading

def blocking_io():
    """模拟阻塞I/O操作"""
    print(f"开始阻塞操作 (线程: {threading.current_thread().name})")
    time.sleep(1)  # 阻塞1秒
    return "完成"

async def main():
    print(f"主协程开始 (线程: {threading.current_thread().name})")

    # 自动在单独的线程中运行,不阻塞主循环
    result = await asyncio.to_thread(blocking_io)

    print(f"结果: {result}")

# asyncio.run(main())

5.3.2 方案二: loop.run_in_executor() (更灵活/CPU 密集型)

这是更底层的 API,允许你自定义执行器(Executor)。特别是对于CPU 密集型任务,必须使用ProcessPoolExecutor来规避 GIL 锁,实现真正的并行。

python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound_task(n):
    """模拟CPU密集型计算"""
    count = 0
    for i in range(n):
        count += i
    return count

async def main():
    loop = asyncio.get_running_loop()

    # 1. 使用默认线程池 (等同于 to_thread)
    # result1 = await loop.run_in_executor(None, blocking_io)

    # 2. 使用进程池 (推荐用于 CPU 密集型)
    # 注意: 创建进程池开销较大,通常复用
    with ProcessPoolExecutor() as pool:
        print("开始CPU密集型计算...")
        result = await loop.run_in_executor(pool, cpu_bound_task, 10**7)
        print(f"计算结果: {result}")

if __name__ == '__main__':
    # Windows下使用多进程需要保护入口
    asyncio.run(main())

5.3.3 方案对比与选择指南

场景推荐方案原理
I/O 密集型 (读写文件、同步网络请求)asyncio.to_thread()使用线程池。I/O 操作期间释放 GIL,多线程可并发。
CPU 密集型 (图像处理、复杂计算)run_in_executor + ProcessPoolExecutor使用进程池。每个进程有独立的解释器和 GIL,实现真并行。
兼容旧版本 (Python < 3.9)run_in_executor(None, ...)使用默认线程池运行。

6、异常、超时与取消

在异步编程中,异常处理、超时控制和任务取消是构建健壮应用程序的重要组成部分。

6.1 异常传播规则

在协程中,异常会按照特定的规则传播。当协程内部发生异常且未被捕获时,异常会向上层传播。

异常处理方式:

python
import asyncio

async def task_with_exception():
    raise ValueError("任务中发生异常")

async def main():
    try:
        await task_with_exception()
    except ValueError as e:
        print(f"捕获异常: {e}")

asyncio.run(main())

6.2 gather 的异常处理

asyncio.gather()在处理多个任务时有特定的异常处理行为。

两种异常处理策略:

  1. return_exceptions=False(默认):第一个异常立即传播
  2. return_exceptions=True:所有结果(包括异常)作为结果返回
python
import asyncio

async def success_task():
    await asyncio.sleep(0.1)
    return "成功"

async def failure_task():
    await asyncio.sleep(0.2)
    raise ValueError("失败")

async def main():
    # 使用return_exceptions=True收集异常
    results = await asyncio.gather(
        success_task(),
        failure_task(),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 异常: {result}")
        else:
            print(f"任务 {i} 结果: {result}")

asyncio.run(main())

6.3 超时控制

超时控制是防止任务无限期等待的重要机制。

超时机制的重要性:

  • 防止网络请求或 I/O 操作无响应
  • 避免程序挂起
  • 提高系统稳定性
python
import asyncio

async def slow_operation():
    await asyncio.sleep(3)
    return "慢操作完成"

async def main():
    try:
        # 使用超时控制
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时")

asyncio.run(main())

6.4 任务取消机制

任务取消是 asyncio 的重要功能,允许在运行时取消正在执行的任务。

取消工作原理:

  1. 调用任务的cancel()方法
  2. 任务内部抛出CancelledError异常
  3. 可以在协程中捕获此异常并进行清理工作
python
import asyncio

async def cancellable_task():
    try:
        print("任务开始")
        await asyncio.sleep(2)
        print("任务完成")
        return "任务完成"
    except asyncio.CancelledError:
        print("任务被取消,进行清理...")
        raise  # 重新抛出CancelledError

async def main():
    # 创建任务
    task = asyncio.create_task(cancellable_task())

    # 等待一小段时间后取消任务
    await asyncio.sleep(0.5)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("任务已取消")

asyncio.run(main())

7、同步原语

在并发编程中,同步原语用于协调多个协程对共享资源的访问,避免竞态条件。

7.1 异步锁(asyncio.Lock)

异步锁用于保护临界区,确保同一时间只有一个协程可以访问受保护的代码段。

异步锁的特点:

  • 与线程锁类似,但与 asyncio 事件循环兼容
  • 不会阻塞事件循环
  • 保证原子操作
python
import asyncio

# 共享资源
counter = 0
lock = asyncio.Lock()

async def increment():
    global counter
    async with lock:  # 保证原子操作
        temp = counter
        await asyncio.sleep(0.1)  # 模拟处理时间
        counter = temp + 1
        print(f"计数器: {counter}")

async def main():
    # 并发执行多个increment操作
    await asyncio.gather(increment(), increment(), increment())
    print(f"最终计数: {counter}")

asyncio.run(main())

7.2 异步信号量(asyncio.Semaphore)

异步信号量用于限制同时访问特定资源的协程数量。

应用场景:

  • 限制同时进行的网络连接数
  • 控制数据库连接池大小
  • 限制 API 请求频率
python
import asyncio

async def access_resource(name, semaphore):
    async with semaphore:  # 限制并发数
        print(f"{name} 获得资源访问权")
        await asyncio.sleep(1)  # 模拟资源使用时间
        print(f"{name} 释放资源")

async def main():
    # 限制最多2个协程同时访问资源
    semaphore = asyncio.Semaphore(2)

    tasks = [
        access_resource(f"任务-{i}", semaphore)
        for i in range(5)
    ]

    await asyncio.gather(*tasks)

asyncio.run(main())

7.3 异步队列(asyncio.Queue)

异步队列是协程间通信的重要工具,特别适用于生产者-消费者模式。

队列的异步特性:

  • 线程安全
  • 支持异步的 put 和 get 操作
  • 队列满时 put 会等待,队列空时 get 会等待
  • 这些操作都不会阻塞事件循环
python
import asyncio

async def producer(queue):
    """生产者:向队列添加数据"""
    for i in range(5):
        item = f"Item-{i}"
        await queue.put(item)
        print(f"生产: {item}")
        await asyncio.sleep(0.5)

async def consumer(queue, name):
    """消费者:从队列取出数据"""
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=2.0)
            print(f"{name} 消费: {item}")
            queue.task_done()
        except asyncio.TimeoutError:
            break

async def main():
    queue = asyncio.Queue()

    # 使用队列
    await asyncio.gather(
        producer(queue),
        consumer(queue, "消费者-1"),
        consumer(queue, "消费者-2")
    )

asyncio.run(main())

8、结构化并发

结构化并发(Structured Concurrency)是一种编程范式,它确保并发任务的生命周期得到正确管理,避免任务泄漏和异常处理问题。

8.1 结构化并发的核心概念

结构化并发(Structured Concurrency)是一种编程范式,旨在让并发代码像顺序代码一样清晰、可预测、可维护。它强调:子任务的生命周期必须被其父任务显式管理,避免"孤儿任务"(即无主、无法追踪的并发任务)。

为什么需要结构化并发?

传统并发(如线程、裸协程)常见问题:

  1. 任务无法追踪:任务启动后,父任务无法确认其完成或取消状态
  2. 异常丢失:子任务的异常无法自动传播回父上下文
  3. 资源泄漏:程序退出时可能残留后台任务,导致连接或文件句柄未释放

核心原则:

  • 作用域绑定: 子任务必须在父任务的作用域内启动和完成,不能"逃逸"到外部
  • 生命周期管理: 父任务结束时,自动取消所有未完成的子任务
  • 异常传播: 任一子任务出错,应取消其余子任务并向上抛出
  • 栈式结构: 作用域是栈式(stack-like)的,符合"先进后出"的控制流

8.1.1 反面教材:非结构化并发

python
import asyncio

async def child_task():
    print("子任务开始")
    await asyncio.sleep(1)
    print("子任务结束")

async def bad_example():
    print("父任务开始")
    # 启动任务但不等待 -> 成为孤儿!
    asyncio.create_task(child_task())
    print("父任务结束")
    # 程序可以在此退出,child_task 被静默丢弃
    # 导致:
    # 1. "子任务结束" 永远不会被打印
    # 2. 如果 child_task 中有资源清理代码,将不会执行

# asyncio.run(bad_example())

8.2 Structured Concurrency 实践 (TaskGroup)

Python 3.11+ 引入了 asyncio.TaskGroup(受 Trio 启发),提供了标准的结构化并发支持。

8.2.1 使用 TaskGroup

python
import asyncio
import time

async def child_task(id, delay):
    print(f"子任务 {id} 启动")
    await asyncio.sleep(delay)
    print(f"子任务 {id} 完成")
    return id

async def good_example():
    print("=== 结构化并发示例 ===")
    start = time.time()

    try:
        # async with 创建一个任务作用域
        async with asyncio.TaskGroup() as tg:
            # 在作用域内创建任务
            task1 = tg.create_task(child_task(1, 1))
            task2 = tg.create_task(child_task(2, 2))
            print("父任务已启动所有子任务")
        # 离开 async with 块时:
        # 1. 自动挂起父任务,等待所有子任务完成
        # 2. 如果发生异常,自动取消其他子任务

    except Exception as e:
        print(f"捕获异常: {e}")

    print(f"所有任务完成, 总耗时: {time.time() - start:.2f}秒")

# asyncio.run(good_example())

8.2.2 关键机制解析

机制说明
TaskGroup提供结构化作用域,管理子任务生命周期
自动等待 (Join)离开作用域时自动 await 所有子任务
自动取消 (Cancel)出现异常或提前退出时,自动取消其余任务
异常聚合多个异常可被收集 (Python 3.11+ 支持 ExceptionGroup)

9、什么时候不要用协程

尽管协程在 I/O 密集型任务中表现出色,但并不适合所有场景。

9.1 不适合协程的场景

1. CPU 密集型算法:

  • 数值计算、图像处理、加密解密
  • 需要持续占用 CPU,无法利用异步的优势

2. 实时控制/硬实时:

  • 需要精确时间控制的场景
  • 协程的调度可能导致时间不可预测

3. 需要多核并行:

  • 真正需要并行计算的场景
  • 协程在单线程中运行无法实现真正的并行

9.2 替代方案

对于 CPU 密集型任务,使用多进程是更好的选择:

python
import multiprocessing

def cpu_intensive_task(data):
    """CPU密集型任务"""
    result = 0
    for i in range(data * 1000000):
        result += i * i
    return result

if __name__ == '__main__':
    data_list = [10, 20, 30]
    with multiprocessing.Pool() as pool:
        results = pool.map(cpu_intensive_task, data_list)
    print(f"多进程处理结果: {results}")

场景选择指南:

场景类型推荐方案原因
I/O 密集型协程高并发,轻量级
CPU 密集型多进程利用多核,真正并行
混合型协程+多进程协程处理 I/O,进程处理 CPU
大量短任务协程创建销毁开销小
实时性要求高多线程更可预测的调度

总结

协程和 asyncio 是处理 I/O 密集型任务的强大工具,关键要点:

核心概念

  1. 理解本质:协程是协作式多任务,不是线程,运行在单线程+事件循环模型上
  2. 掌握机制:await是唯一让出点,事件循环负责调度
  3. 状态管理:理解协程的 5 种状态及其切换过程

最佳实践

  1. 避免阻塞:永远不要在协程中使用阻塞操作(time.sleep()requests等)
  2. 合理并发:使用asyncio.gather()等并发 API,而非顺序 await
  3. 正确处理同步代码:使用asyncio.to_thread()run_in_executor()
  4. 异常处理:正确处理异常、超时和取消
  5. 资源控制:使用信号量等同步原语控制并发度

适用场景

  • 推荐使用:I/O 密集型任务(网络请求、文件读写、数据库操作)
  • 不推荐使用:CPU 密集型任务(数值计算、图像处理等)
  • 替代方案:CPU 密集型任务使用多进程

关键记忆

概念要点
协程 vs 线程协作式调度 vs 抢占式调度
事件循环协程调度的核心,维护任务队列
await暂停协程,让出控制权的唯一机制
并发 APIgather/wait/create_task
同步原语Lock/Semaphore/Queue
适用场景I/O 密集型任务,非 CPU 密集型
性能陷阱阻塞操作、CPU 密集型、顺序 await

邬东升的博客