Skip to content

2. Python 异步编程

1. 事件循环与底层模型

事件循环(Event Loop)是异步编程的核心机制,负责调度和执行协程任务。它本质上是一个持续运行的循环,不断检测和执行任务。

事件循环工作原理

事件循环采用 选择器模式(Selector Pattern) 监控 I/O 事件,当协程遇到 I/O 操作时会主动挂起,将控制权交还给事件循环,让其调度其他就绪的任务。

事件循环的工作原理可以用以下伪代码来描述:

text
# 伪代码

任务列表 = [ 任务1, 任务2, 任务3,... ]

while True:
    可执行的任务列表, 已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回

    for 就绪任务 in 可执行的任务列表:
        执行已就绪的任务

    for 已完成的任务 in 已完成的任务列表:
        在任务列表中移除 已完成的任务

    如果 任务列表 中的任务都已完成,则终止循环
python
import asyncio

# 生成事件循环(Python 3.10+ 不推荐直接获取)
loop = asyncio.get_event_loop()

# 将任务放入事件循环并运行直到完成
loop.run_until_complete(协程对象)

# Python 3.7+ 推荐使用更简洁的方式
asyncio.run(main())

可等待对象体系 (Awaitable Types)

在 Python 异步编程中,只有 可等待对象 (Awaitable) 才能放在 await 关键字后面。理解这个类型体系对于编写正确的异步代码至关重要。

  • Coroutine(协程):由 async def 定义的函数调用后返回的对象,包含实际的异步逻辑代码。
  • Future:表示一个异步操作的最终结果,是底层对象,通常不直接使用。
  • Task:Future 的子类,将协程包装成可调度的任务,是最常用的并发执行单元。

2. 基础操作

协程定义与运行

使用 async def 定义协程函数,调用时返回协程对象。注意:调用协程函数不会立即执行函数体,只有提交到事件循环后才会执行。

python
# 定义协程函数
async def func():
    print("Hello from coroutine")
    return "result"


# 调用协程函数获取协程对象(此时函数体未执行)
coro_obj = func()

# 运行协程(函数体在此时才真正执行)
result = asyncio.run(coro_obj)

3. await 关键字与并发控制

await 基础用法

await 用于等待可等待对象(协程、Task、Future)完成。当遇到 await 时,当前协程会挂起,控制权交回事件循环。

python
import asyncio


async def others():
    print("  [others] Start others")
    await asyncio.sleep(1)  # 遇到IO,会切换去执行其他任务
    print("  [others] End others")
    return "Result from others"


async def func():
    print("[func] Start func")
    await asyncio.sleep(2)  # 遇到IO,会切换去执行其他任务
    print("[func] End func")
    return "Result from func"


async def main():
    # 创建task对象,将当前执行的函数func()和others()加入事件循环中
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(others())

    # 等待两个任务完成
    result1 = await task1
    result2 = await task2

    print(f"\n结果: {result1}, {result2}")


asyncio.run(main())

# 执行顺序:
# 1. [func] Start func -> 遇到 sleep(2),切换到 task2
# 2. [others] Start others -> 遇到 sleep(1),切换
# 3. 1秒后 others 的 sleep 完成 -> [others] End others
# 4. 2秒后 func 的 sleep 完成 -> [func] End func

# 最后输出结果:
# [func] Start func
# [others] Start others
# [others] End others
# [func] End func

# 结果: Result from func, Result from others

4. Task 对象与并发任务管理

创建 Task

Tasks 用于并发调度协程,通过 asyncio.create_task(协程对象) 的方式创建 Task 对象,这样可以让协程加入到事件循环中进行调度执行。

白话:在事件循环中添加多个任务

python
import asyncio


async def func():
    print("Start func")
    await asyncio.sleep(1)
    print("End func")
    return "Result from func"


async def main():
    print("Creating task...")

    task_list = [
        asyncio.create_task(func(), name="n1"),
        asyncio.create_task(func(), name="n2"),
    ]

    print("Tasks created, waiting for results...")

    done, pending = await asyncio.wait(task_list)
    print(done)


asyncio.run(main())
python
import asyncio


async def func():
    print("Start func")
    await asyncio.sleep(1)
    print("End func")
    return "Result from func"


task_list = [
    func(),
    func(),
]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

asyncio.Future 对象

task 对象是继承Future对象,task对象内部await结果的处理基于Future对象来实现的。

concurrent.futures.Future 对象

使用线程池、进程池实现异步操作时用到的对象。

异步迭代器

异步上下文管理器

此种对象通过定义 __aenter____aexit__ 方法来对 async with 语句中的环境进行控制。

python
import asyncio


class AsyncContextManager:
    async def __init__(self):
        pass

    async def do_something(self):
        print("Doing something asynchronously")

    async def __aenter__(self):
        # 操作之前先打开
        print("Entering async context")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 操作之后再关闭
        print("Exiting async context")


async def main():
    # async with 必须放在协程函数内, manager 是__aenter__返回的对象
    async with AsyncContextManager() as manager:
        result = await manager.do_something()
        print(result)


asyncio.run(main())

5. 高级并发控制

asyncio.gather - 聚合多个任务

asyncio.gather 是最常用的并发执行工具,可以同时运行多个协程并收集所有结果。

python
import asyncio


async def fetch_data(id, delay):
    print(f"任务 {id} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {id} 完成")
    return f"Result {id}"


async def main():
    # 并发运行多个协程,按顺序返回结果
    results = await asyncio.gather(
        fetch_data(1, 2), fetch_data(2, 1), fetch_data(3, 1.5)
    )
    print(f"所有结果: {results}")  # ['Result 1', 'Result 2', 'Result 3']


asyncio.run(main())

gather 的错误处理

默认情况下,gather 中任何一个任务抛出异常会立即传播,但其他任务会继续运行。设置 return_exceptions=True 可以将异常作为结果返回。

python
async def task_with_error(id):
    await asyncio.sleep(1)
    if id == 2:
        raise ValueError(f"任务 {id} 失败")
    return f"成功 {id}"


async def main():
    # 错误处理方式1:默认行为(抛出异常)
    try:
        results = await asyncio.gather(
            task_with_error(1), task_with_error(2), task_with_error(3)
        )
    except ValueError as e:
        print(f"捕获到错误: {e}")

    # 错误处理方式2:将异常作为结果返回
    results = await asyncio.gather(
        task_with_error(1),
        task_with_error(2),
        task_with_error(3),
        return_exceptions=True,
    )

    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"任务 {i} 错误: {result}")
        else:
            print(f"任务 {i} 结果: {result}")


asyncio.run(main())

asyncio.TaskGroup - Python 3.11+

TaskGroup 是结构化并发的新工具,提供更安全的任务管理。当上下文管理器退出时,会自动等待所有任务完成。

python
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data(1, 1))
        task2 = tg.create_task(fetch_data(2, 2))
        # 离开上下文时自动等待所有任务

    # 此时所有任务已完成
    print(f"任务1结果: {task1.result()}")
    print(f"任务2结果: {task2.result()}")

超时控制 (Timeout)

使用 asyncio.wait_for 为任务设置超时时间,防止任务无限期等待。

python
async def slow_task():
    await asyncio.sleep(5)
    return "完成"


async def main():
    try:
        # 设置2秒超时
        result = await asyncio.wait_for(slow_task(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("任务超时!")


asyncio.run(main())

任务取消 (Cancellation)

任务可以被显式取消,协程会收到 CancelledError 异常。

python
async def long_running():
    try:
        print("任务开始...")
        await asyncio.sleep(10)
        print("任务完成")
    except asyncio.CancelledError:
        print("任务被取消")
        raise  # 最佳实践:重新抛出


async def main():
    task = asyncio.create_task(long_running())
    await asyncio.sleep(1)

    # 取消任务
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("在 main 中捕获到取消")


asyncio.run(main())

信号量 (Semaphore) - 限制并发数

当需要限制同时运行的任务数量时(如限制 API 请求速率),使用 asyncio.Semaphore

信号量维护一个内部计数器,表示可用的"槽位"数量:

Available=InitialAcquired+Released\text{Available} = \text{Initial} - \text{Acquired} + \text{Released}

python
async def limited_task(sem, id):
    async with sem:  # 获取信号量(计数器-1)
        print(f"任务 {id} 开始执行")
        await asyncio.sleep(2)
        print(f"任务 {id} 完成")
    # 退出上下文时释放(计数器+1)


async def main():
    sem = asyncio.Semaphore(3)  # 最多同时运行3个任务

    tasks = [limited_task(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)


asyncio.run(main())

6. 与同步代码交互

run_in_executor - 运行阻塞代码

在异步程序中运行 CPU 密集型或阻塞 I/O 操作时,需要使用执行器(Executor)避免阻塞事件循环。

python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def blocking_io(name):
    """模拟阻塞 I/O(如 requests 库)"""
    print(f"{name} 开始阻塞操作")
    time.sleep(2)  # 阻塞操作
    return f"{name} 完成"


def cpu_bound(n):
    """CPU 密集型任务"""
    return sum(i * i for i in range(n))


async def main():
    loop = asyncio.get_running_loop()

    # 方式1:使用默认线程池执行器(适用于 I/O 阻塞)
    result1 = await loop.run_in_executor(None, blocking_io, "任务1")
    print(result1)

    # 方式2:使用自定义线程池
    with ThreadPoolExecutor(max_workers=3) as pool:
        result2 = await loop.run_in_executor(pool, blocking_io, "任务2")

    # 方式3:使用进程池(适用于 CPU 密集型)
    with ProcessPoolExecutor() as pool:
        result3 = await loop.run_in_executor(pool, cpu_bound, 10**6)
        print(f"计算结果: {result3}")


asyncio.run(main())

asyncio.to_thread - Python 3.9+

更简洁的方式在线程中运行同步函数。

python
async def main():
    # 自动在独立线程中运行
    result = await asyncio.to_thread(blocking_io, "任务")
    print(result)

7. 常用函数速查表

函数功能描述使用场景
asyncio.run(coro)运行协程的入口点程序主入口
asyncio.create_task(coro)创建 Task 对象并发执行多个协程
asyncio.gather(*aws)聚合运行多个可等待对象需要所有结果
asyncio.wait_for(aw, timeout)为任务设置超时防止任务无限等待
asyncio.sleep(delay)异步休眠模拟 I/O 延迟
asyncio.to_thread(func)在线程中运行同步函数执行阻塞 I/O
loop.run_in_executor(pool, func)在执行器中运行CPU 密集型/阻塞操作
asyncio.current_task()获取当前任务调试和日志
asyncio.get_running_loop()获取运行中的事件循环底层操作
asyncio.Semaphore(n)创建信号量限制并发数

8. 最佳实践总结

  1. 避免阻塞事件循环:不要在协程中使用 time.sleep()requests 等同步阻塞调用,应使用 asyncio.sleep()aiohttp 等异步版本。

  2. 正确处理异常:使用 try/except 捕获协程中的异常,在 gather 中使用 return_exceptions=True 收集所有结果。

  3. 合理使用 Task:需要并发执行时才创建 Task,单个协程直接 await 即可。

  4. 限制并发数:使用 Semaphore 控制并发,避免资源耗尽(如数据库连接池、API 速率限制)。

  5. 优雅关闭:使用 task.cancel() 取消任务,在协程中正确处理 CancelledError

构建时间:1/4/2026, 4:04:11 PM | 本博客内容均为自己学习,如内容涉及侵权,请联系邮箱:pangzl0215@163.com