2. Python 异步编程
1. 事件循环与底层模型
事件循环(Event Loop)是异步编程的核心机制,负责调度和执行协程任务。它本质上是一个持续运行的循环,不断检测和执行任务。
事件循环工作原理
事件循环采用 选择器模式(Selector Pattern) 监控 I/O 事件,当协程遇到 I/O 操作时会主动挂起,将控制权交还给事件循环,让其调度其他就绪的任务。
事件循环的工作原理可以用以下伪代码来描述:
# 伪代码
任务列表 = [ 任务1, 任务2, 任务3,... ]
while True:
可执行的任务列表, 已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
如果 任务列表 中的任务都已完成,则终止循环import asyncio
# 生成事件循环(Python 3.10+ 不推荐直接获取)
loop = asyncio.get_event_loop()
# 将任务放入事件循环并运行直到完成
loop.run_until_complete(协程对象)
# Python 3.7+ 推荐使用更简洁的方式
asyncio.run(main())可等待对象体系 (Awaitable Types)
在 Python 异步编程中,只有 可等待对象 (Awaitable) 才能放在 await 关键字后面。理解这个类型体系对于编写正确的异步代码至关重要。
- Coroutine(协程):由
async def定义的函数调用后返回的对象,包含实际的异步逻辑代码。 - Future:表示一个异步操作的最终结果,是底层对象,通常不直接使用。
- Task:Future 的子类,将协程包装成可调度的任务,是最常用的并发执行单元。
2. 基础操作
协程定义与运行
使用 async def 定义协程函数,调用时返回协程对象。注意:调用协程函数不会立即执行函数体,只有提交到事件循环后才会执行。
# 定义协程函数
async def func():
print("Hello from coroutine")
return "result"
# 调用协程函数获取协程对象(此时函数体未执行)
coro_obj = func()
# 运行协程(函数体在此时才真正执行)
result = asyncio.run(coro_obj)3. await 关键字与并发控制
await 基础用法
await 用于等待可等待对象(协程、Task、Future)完成。当遇到 await 时,当前协程会挂起,控制权交回事件循环。
import asyncio
async def others():
print(" [others] Start others")
await asyncio.sleep(1) # 遇到IO,会切换去执行其他任务
print(" [others] End others")
return "Result from others"
async def func():
print("[func] Start func")
await asyncio.sleep(2) # 遇到IO,会切换去执行其他任务
print("[func] End func")
return "Result from func"
async def main():
# 创建task对象,将当前执行的函数func()和others()加入事件循环中
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(others())
# 等待两个任务完成
result1 = await task1
result2 = await task2
print(f"\n结果: {result1}, {result2}")
asyncio.run(main())
# 执行顺序:
# 1. [func] Start func -> 遇到 sleep(2),切换到 task2
# 2. [others] Start others -> 遇到 sleep(1),切换
# 3. 1秒后 others 的 sleep 完成 -> [others] End others
# 4. 2秒后 func 的 sleep 完成 -> [func] End func
# 最后输出结果:
# [func] Start func
# [others] Start others
# [others] End others
# [func] End func
# 结果: Result from func, Result from others4. Task 对象与并发任务管理
创建 Task
Tasks 用于并发调度协程,通过 asyncio.create_task(协程对象) 的方式创建 Task 对象,这样可以让协程加入到事件循环中进行调度执行。
白话:在事件循环中添加多个任务
import asyncio
async def func():
print("Start func")
await asyncio.sleep(1)
print("End func")
return "Result from func"
async def main():
print("Creating task...")
task_list = [
asyncio.create_task(func(), name="n1"),
asyncio.create_task(func(), name="n2"),
]
print("Tasks created, waiting for results...")
done, pending = await asyncio.wait(task_list)
print(done)
asyncio.run(main())import asyncio
async def func():
print("Start func")
await asyncio.sleep(1)
print("End func")
return "Result from func"
task_list = [
func(),
func(),
]
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)asyncio.Future 对象
task 对象是继承Future对象,task对象内部await结果的处理基于Future对象来实现的。
concurrent.futures.Future 对象
使用线程池、进程池实现异步操作时用到的对象。
异步迭代器
异步上下文管理器
此种对象通过定义 __aenter__ 和 __aexit__ 方法来对 async with 语句中的环境进行控制。
import asyncio
class AsyncContextManager:
async def __init__(self):
pass
async def do_something(self):
print("Doing something asynchronously")
async def __aenter__(self):
# 操作之前先打开
print("Entering async context")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 操作之后再关闭
print("Exiting async context")
async def main():
# async with 必须放在协程函数内, manager 是__aenter__返回的对象
async with AsyncContextManager() as manager:
result = await manager.do_something()
print(result)
asyncio.run(main())5. 高级并发控制
asyncio.gather - 聚合多个任务
asyncio.gather 是最常用的并发执行工具,可以同时运行多个协程并收集所有结果。
import asyncio
async def fetch_data(id, delay):
print(f"任务 {id} 开始")
await asyncio.sleep(delay)
print(f"任务 {id} 完成")
return f"Result {id}"
async def main():
# 并发运行多个协程,按顺序返回结果
results = await asyncio.gather(
fetch_data(1, 2), fetch_data(2, 1), fetch_data(3, 1.5)
)
print(f"所有结果: {results}") # ['Result 1', 'Result 2', 'Result 3']
asyncio.run(main())gather 的错误处理
默认情况下,gather 中任何一个任务抛出异常会立即传播,但其他任务会继续运行。设置 return_exceptions=True 可以将异常作为结果返回。
async def task_with_error(id):
await asyncio.sleep(1)
if id == 2:
raise ValueError(f"任务 {id} 失败")
return f"成功 {id}"
async def main():
# 错误处理方式1:默认行为(抛出异常)
try:
results = await asyncio.gather(
task_with_error(1), task_with_error(2), task_with_error(3)
)
except ValueError as e:
print(f"捕获到错误: {e}")
# 错误处理方式2:将异常作为结果返回
results = await asyncio.gather(
task_with_error(1),
task_with_error(2),
task_with_error(3),
return_exceptions=True,
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"任务 {i} 错误: {result}")
else:
print(f"任务 {i} 结果: {result}")
asyncio.run(main())asyncio.TaskGroup - Python 3.11+
TaskGroup 是结构化并发的新工具,提供更安全的任务管理。当上下文管理器退出时,会自动等待所有任务完成。
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(1, 1))
task2 = tg.create_task(fetch_data(2, 2))
# 离开上下文时自动等待所有任务
# 此时所有任务已完成
print(f"任务1结果: {task1.result()}")
print(f"任务2结果: {task2.result()}")超时控制 (Timeout)
使用 asyncio.wait_for 为任务设置超时时间,防止任务无限期等待。
async def slow_task():
await asyncio.sleep(5)
return "完成"
async def main():
try:
# 设置2秒超时
result = await asyncio.wait_for(slow_task(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("任务超时!")
asyncio.run(main())任务取消 (Cancellation)
任务可以被显式取消,协程会收到 CancelledError 异常。
async def long_running():
try:
print("任务开始...")
await asyncio.sleep(10)
print("任务完成")
except asyncio.CancelledError:
print("任务被取消")
raise # 最佳实践:重新抛出
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(1)
# 取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print("在 main 中捕获到取消")
asyncio.run(main())信号量 (Semaphore) - 限制并发数
当需要限制同时运行的任务数量时(如限制 API 请求速率),使用 asyncio.Semaphore。
信号量维护一个内部计数器,表示可用的"槽位"数量:
async def limited_task(sem, id):
async with sem: # 获取信号量(计数器-1)
print(f"任务 {id} 开始执行")
await asyncio.sleep(2)
print(f"任务 {id} 完成")
# 退出上下文时释放(计数器+1)
async def main():
sem = asyncio.Semaphore(3) # 最多同时运行3个任务
tasks = [limited_task(sem, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())6. 与同步代码交互
run_in_executor - 运行阻塞代码
在异步程序中运行 CPU 密集型或阻塞 I/O 操作时,需要使用执行器(Executor)避免阻塞事件循环。
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def blocking_io(name):
"""模拟阻塞 I/O(如 requests 库)"""
print(f"{name} 开始阻塞操作")
time.sleep(2) # 阻塞操作
return f"{name} 完成"
def cpu_bound(n):
"""CPU 密集型任务"""
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
# 方式1:使用默认线程池执行器(适用于 I/O 阻塞)
result1 = await loop.run_in_executor(None, blocking_io, "任务1")
print(result1)
# 方式2:使用自定义线程池
with ThreadPoolExecutor(max_workers=3) as pool:
result2 = await loop.run_in_executor(pool, blocking_io, "任务2")
# 方式3:使用进程池(适用于 CPU 密集型)
with ProcessPoolExecutor() as pool:
result3 = await loop.run_in_executor(pool, cpu_bound, 10**6)
print(f"计算结果: {result3}")
asyncio.run(main())asyncio.to_thread - Python 3.9+
更简洁的方式在线程中运行同步函数。
async def main():
# 自动在独立线程中运行
result = await asyncio.to_thread(blocking_io, "任务")
print(result)7. 常用函数速查表
| 函数 | 功能描述 | 使用场景 |
|---|---|---|
asyncio.run(coro) | 运行协程的入口点 | 程序主入口 |
asyncio.create_task(coro) | 创建 Task 对象 | 并发执行多个协程 |
asyncio.gather(*aws) | 聚合运行多个可等待对象 | 需要所有结果 |
asyncio.wait_for(aw, timeout) | 为任务设置超时 | 防止任务无限等待 |
asyncio.sleep(delay) | 异步休眠 | 模拟 I/O 延迟 |
asyncio.to_thread(func) | 在线程中运行同步函数 | 执行阻塞 I/O |
loop.run_in_executor(pool, func) | 在执行器中运行 | CPU 密集型/阻塞操作 |
asyncio.current_task() | 获取当前任务 | 调试和日志 |
asyncio.get_running_loop() | 获取运行中的事件循环 | 底层操作 |
asyncio.Semaphore(n) | 创建信号量 | 限制并发数 |
8. 最佳实践总结
避免阻塞事件循环:不要在协程中使用
time.sleep()、requests等同步阻塞调用,应使用asyncio.sleep()、aiohttp等异步版本。正确处理异常:使用
try/except捕获协程中的异常,在gather中使用return_exceptions=True收集所有结果。合理使用 Task:需要并发执行时才创建 Task,单个协程直接
await即可。限制并发数:使用 Semaphore 控制并发,避免资源耗尽(如数据库连接池、API 速率限制)。
优雅关闭:使用
task.cancel()取消任务,在协程中正确处理CancelledError。