这是一个为你定制的AnyIO 深度学习计划。
为什么要学 AnyIO?
如果你已经了解了 asyncio,你会发现 asyncio 的 API 有时比较混乱(低级 API 和高级 API 混杂)。AnyIO 是建立在 asyncio 和 trio 之上的兼容层,它强制使用结构化并发 (Structured Concurrency),代码更安全、更简洁,且能无缝切换底层后端。FastAPI 的底层其实就是 AnyIO。
针对于异步方法的代码实现逻辑可以参考我的上一篇博客:https://blog.csdn.net/wang602125218/article/details/156025270?spm=1011.2415.3001.5331
🗺️ 总览:从 asyncio 迁移到 AnyIO (2周计划)
- 阶段一:思维重构 (Day 1-3)—— 强制性的结构化并发(TaskGroup)。
- 阶段二:更优雅的超时与取消 (Day 4-6)—— Scope(作用域)机制。
- 阶段三:高层 I/O 抽象 (Day 7-10)—— Stream(流)与异步文件系统。
- 阶段四:跨平台与工程化 (Day 11-14)—— 兼容性测试与 Subprocesses。
🟢 第一阶段:思维重构 - 结构化并发 (Day 1-3)
核心差异:在 AnyIO 中,不允许像asyncio.create_task()那样“发射后不管 (fire-and-forget)”地创建游离任务。所有任务必须由一个“父节点” (TaskGroup) 管理,父节点会等待所有子节点结束。
1. 核心知识点
- 入口:
anyio.run(main)。 - TaskGroup (任务组):这是 AnyIO 的灵魂。所有并发任务必须在
async with anyio.create_task_group() as tg:块中启动。 - tg.start_soon():替代
asyncio.create_task()。
2. 代码对比:asyncio vs AnyIO
🔄asyncio 风格 (容易产生游离任务)
# 传统的 asyncio 允许任务在后台悄悄跑,报错了可能都不知道 task = asyncio.create_task(background_work())✅AnyIO 风格 (严格的父子关系)
from anyio import create_task_group, run, sleep async def worker(name, seconds): print(f"[{name}] 开始工作") await sleep(seconds) print(f"[{name}] 完成") async def main(): print("主程序开始") # 核心:必须使用 TaskGroup async with create_task_group() as tg: # 这里的任务被 tg 管理 tg.start_soon(worker, "A", 1) tg.start_soon(worker, "B", 2) print("任务已安排,等待它们全部结束...") # 注意:只有当 with 块里所有的任务都结束后,代码才会执行到这里 print("所有任务都结束了!主程序退出") if __name__ == "__main__": run(main)🔵 第二阶段:更优雅的超时与取消 (Day 4-6)
核心差异:asyncio的超时通常作用于await语句上。AnyIO 引入了Scope (作用域)的概念,可以在一段代码块上设置超时,而且不会抛出异常(如果不需要的话)。
1. 核心知识点
- Cancel Scope (取消作用域):可以手动取消的一块代码区域。
- move_on_after:超时后不报错,只是跳过代码块剩余部分,继续往下执行(非常实用!)。
- fail_after:超时后抛出
TimeoutError(类似asyncio.wait_for)。
2. 实战代码:优雅的超时处理
import anyio async def slow_operation(): print("开始慢速操作...") await anyio.sleep(5) # 模拟耗时 print("慢速操作完成 (这一行不应该被打印)") return "Success" async def main(): # 场景1:尝试执行,如果超时就算了,不要报错炸得整个程序崩溃 with anyio.move_on_after(2) as scope: await slow_operation() if scope.cancelled_caught: print("场景1:操作超时了,但我选择忽略并继续") # 场景2:必须在规定时间内完成,否则报错 try: with anyio.fail_after(1): await slow_operation() except TimeoutError: print("场景2:操作超时,捕获异常") if __name__ == "__main__": anyio.run(main)🟠 第三阶段:高层 I/O 抽象 (Day 7-10)
核心差异:AnyIO 提供了比 Socket 更高级的抽象,称为Streams (流),以及完全异步的pathlib替代品。
1. 核心知识点
- ByteStream:用于 TCP/UNIX 套接字。包含
send(),receive(),aclose()。 - ObjectStream:可以在流中直接发送 Python 对象(序列化)。
- anyio.Path:异步版本的文件路径操作,API 和标准库
pathlib几乎一样。
2. 实战代码:异步文件操作
from anyio import Path, run async def main(): # 使用 anyio.Path,完全异步,不阻塞 Loop path = Path("data.txt") # 写入 async with await path.open("w") as f: await f.write("Hello AnyIO\n") # 读取 if await path.exists(): content = await path.read_text() print(f"文件内容: {content.strip()}") # 遍历目录 async for p in Path(".").glob("*.py"): print(f"发现脚本: {p.name}") if __name__ == "__main__": run(main)🟣 第四阶段:跨平台与工程化 (Day 11-14)
目标:掌握 AnyIO 的核心优势——兼容性与测试。
1. 知识点
- Backend 选择:AnyIO 可以运行在
asyncio或trio之上。 - Synchronization:
anyio.Event,anyio.Lock,anyio.Semaphore(用法与 asyncio 类似,但更安全)。 - Subprocesses:
anyio.run_process,比 asyncio 的 subprocess 好用太多。
2. 测试配置 (pytest)
AnyIO 有专门的 pytest 插件。
# pip install pytest-trio (如果你想测 trio 后端) # 你的测试文件 test_main.py import pytest import anyio @pytest.mark.anyio async def test_sleep(): start = anyio.current_time() await anyio.sleep(0.1) assert anyio.current_time() - start >= 0.1🎓 最终挑战题目:智能日志批量处理器
场景描述: 假设你有一个日志目录(./logs),里面会不断生成一些.txt日志文件。你需要编写一个程序,模拟“生产者”生成日志,同时启动“消费者”并发处理这些日志。
核心考点:
- 文件 I/O:使用
anyio.Path进行文件的读写和删除。 - 结构化并发:使用
TaskGroup同时运行“生成器”和“处理器”。 - 超时控制:模拟某些日志处理非常慢(卡死),必须使用
move_on_after/fail_after跳过它,不能阻塞其他文件的处理。
具体需求
- 准备工作:
- 程序启动时,确保
./logs目录存在(如果不存在则创建)。
- 程序启动时,确保
- 角色 A:日志生成器 (Generator)
- 每隔 0.5 秒生成一个
.txt文件。 - 文件名格式:
log_1.txt,log_2.txt... - 文件内容:写入一行随机文本(例如 "Task Data")。
- 要求:生成 5 个文件后停止。
- 每隔 0.5 秒生成一个
- 角色 B:日志处理器 (Processor)
- 这是一个持续运行的任务,不断扫描
./logs目录。 - 一旦发现文件,开启一个子任务去处理它(读取内容 -> 打印内容 -> 删除文件)。
- 难点(模拟故障):
- 这是一个持续运行的任务,不断扫描
- 在处理逻辑中加入随机延时。如果是偶数编号的文件(如
log_2.txt),让它sleep(3)秒(模拟卡死)。如果是奇数,只sleep(0.5)秒。 - 强制要求:每个文件的处理时间限制为1 秒。如果超过 1 秒没处理完,必须强制取消该文件的处理,打印“处理超时,跳过文件”,并保留该文件不删除(作为错误留档)。
- 在处理逻辑中加入随机延时。如果是偶数编号的文件(如
- 程序出口:
- 当生成器完成任务,且目录中所有能处理的文件都处理完(或超时跳过)后,主程序结束。
import anyio # 创建目录 LOG_DIR = anyio.Path("./logs") async def path_exists(): # 判断目录是否存在 if not await LOG_DIR.exists(): await LOG_DIR.mkdir() async def single_task_generate(): # 生成器逻辑 for i in range(1, 6): # 生成文件名 file_name = LOG_DIR / f"log_{i}.txt" # 写入内容 async with await file_name.open("w") as f: await f.write(f"Task Data {i}") await anyio.sleep(0.5) async def single_task_processor(file: anyio.Path, process_files: set): # 获取文件名 file_name = file.name try: with anyio.fail_after(1): # 如果是偶数编号,sleep(3)模拟卡死 if int(file_name.split(".")[0][-1]) % 2 == 0: await anyio.sleep(3) else: await anyio.sleep(0.5) # 处理文件 - 读取 - 打印 - 删除 # 读取 content = await file.read_text() # 打印 print(f"Processing: {file.name} Content: {content}") # 删除 await file.unlink() except TimeoutError: print(f"处理超时,跳过文件{file_name}") async def main(): # 保证文件存在 await path_exists() # 用于记录已经处理过的文件 processed_files = set() # 创建taskgroup用于管理任务 async with anyio.create_task_group() as tg: # 启动生成任务 tg.start_soon(single_task_generate) # 启动删除任务 # 后台检查实现10s start_time = anyio.current_time() while anyio.current_time() - start_time < 10: # 遍历文件 async for p in LOG_DIR.glob("*.txt"): if p.name not in processed_files: # 使用了就直接记录,不能放到异步任务中进行记录 processed_files.add(p.name) tg.start_soon(single_task_processor, p, processed_files) await anyio.sleep(0.2) if __name__ == "__main__": try: anyio.run(main) except KeyboardInterrupt: pass💡 学习建议:AnyIO vs Asyncio 怎么选?
- 写应用层业务 (Web Server, 爬虫):首选AnyIO。它的
TaskGroup能帮你避免很多内存泄漏和僵尸任务的问题,而且 API 设计更符合人类直觉。 - 写底层库:可能需要用asyncio,因为它是标准库,依赖最少。
- FastAPI 用户:你已经在用 AnyIO 了,FastAPI 的
async def路由就是跑在 AnyIO 上的。