47 ︳Python线程池模式:ThreadPoolExecutor与进程池/异步队列实战

张开发
2026/6/4 15:45:31 15 分钟阅读
47 ︳Python线程池模式:ThreadPoolExecutor与进程池/异步队列实战
文章目录摘要SEO 摘要目录开篇核心知识点1. 并发编程的核心问题:GIL 与并发模型选择2. concurrent.futures:线程池与进程池3. ThreadPoolExecutor:I/O 密集型任务4. ProcessPoolExecutor:CPU 密集型任务5. asyncio.Queue:异步队列设计6. 限流器:令牌桶与滑动窗口7. 有界队列 vs 无界队列实战案例:高并发爬虫系统常见错误与避坑指南错误1:混淆 I/O 密集和 CPU 密集任务错误2:创建过多并发单元错误3:忘记在 asyncio 中使用非阻塞操作错误4:线程不安全的共享状态错误5:忽略队列的背压机制术语注释面试高频问答深度扩展扩展话题1:asyncio.Semaphore 并发控制扩展话题2:proactors 模式扩展话题3:weakref 与资源清理系列总结(第47章预告)版权声明专栏定位:Python 工程化进阶(第47章)适读人群:后端工程师、技术负责人、架构师摘要一个爬虫服务在单机上抓取 10000 个页面,用普通 for 循环需要 10000 秒(假设每个页面 1 秒)。如果改成 50 个并发协程,只需要 200 秒——性能提升 50 倍。这就是并发编程的价值:在 I/O 密集型任务中,通过同时处理多个请求来充分利用等待时间。但并发不是银弹:线程安全问题、GIL 限制、锁竞争、死锁——每一个都是坑。本文将从并发编程的核心概念出发,系统讲解concurrent.futures的线程池/进程池使用、asyncio.Queue异步队列设计、限流器实现、以及有界/无界队列的选择策略。通过本文,你将能够根据任务类型(CPU 密集型 vs I/O 密集型)选择正确的并发方案,并避免常见并发陷阱。SEO 摘要Python 高并发编程实战。深入讲解 concurrent.futures 线程池/进程池、ProcessPoolExecutor/ThreadPoolExecutor 使用场景、asyncio.Queue 异步队列、限流器设计(令牌桶/滑动窗口)、有界队列与无界队列对比、GIL 与并发选择。通过爬虫系统、批量处理、限流 API 等实战案例,提供完整可运行的代码。目录并发编程的核心问题:GIL 与并发模型选择concurrent.futures:线程池与进程池ThreadPoolExecutor:I/O 密集型任务ProcessPoolExecutor:CPU 密集型任务asyncio.Queue:异步队列设计限流器:令牌桶与滑动窗口有界队列 vs 无界队列实战案例:高并发爬虫系统常见错误与避坑指南术语注释面试高频问答深度扩展开篇我见过一个场景:工程师用 Python 的threading模块写了一个爬虫,创建了 1000 个线程去并发抓取网页。结果程序运行几分钟后就崩溃了——不是因为爬虫本身的问题,而是因为创建太多线程导致系统资源耗尽:每个线程默认占用 8MB 栈空间,1000 个线程就是 8GB 内存。这就是"并发陷阱":你以为并发是免费的,实际上每个并发单元都有成本。线程有栈空间成本、上下文切换成本、同步成本;进程有进程创建成本、进程间通信成本。理解这些成本,才能正确选择并发方案。并发模型的选择决策树:任务类型 ├── I/O 密集型(网络请求、文件读写、数据库查询) │ ├── 并发数小(100):asyncio(推荐) │ ├── 并发数中等(100-1000):ThreadPoolExecutor │ └── 并发数大(1000):asyncio + 连接池 │ └── CPU 密集型(计算、加密、压缩、图像处理) ├── 任务独立、数目少:ProcessPoolExecutor └── 任务依赖、数目多:multiprocessing.Queue + 进程池核心知识点1. 并发编程的核心问题:GIL 与并发模型选择GIL(Global Interpreter Lock)是 Python 的一个机制:同一时刻只有一个线程执行 Python 字节码。这意味着Python 的多线程不能真正利用多核 CPU。importthreadingimporttime# CPU 密集型任务defcpu_task(n):returnsum(i*iforiinrange(n))# I/O 密集型任务defio_task():time.sleep(0.1)# 模拟 I/O 等待return"done"# 问题:CPU 密集型任务在多线程下并不会加速threads=[threading.Thread(target=cpu_task,args=(1000000,))for_inrange(4)]start=time.time()fortinthreads:t.start()fortinthreads:t.join()print(f"4 线程 CPU 任务耗时:{time.time()-start:.2f}s")# 结果:耗时和单线程几乎相同(因为 GIL)# I/O 密集型任务在多线程下有加速(因为有等待时间)threads=[threading.Thread(target=io_task)for_inrange(4)]start=time.time()fortinthreads:t.start()fortinthreads:t.join()print(f"4 线程 I/O 任务耗时:{time.time()-start:.2f}s")# 结果:大约 0.1s(4 个任务同时等待)2. concurrent.futures:线程池与进程池concurrent.futures是 Python 3.2 引入的高层并发抽象,简化了线程池和进程池的使用。fromconcurrent.futuresimport(ThreadPoolExecutor,ProcessPoolExecutor,as_completed,wait,Future)importtimeimportos# ========== ThreadPoolExecutor ==========# 基本用法withThreadPoolExecutor(max_workers=4)asexecutor:# 提交任务future1=executor.submit(pow,2,3)# 2^3 = 8future2=executor.submit(pow,3,2)# 3^2 = 9# 获取结果print(future1.result())# 8print(future2.result())# 9# map 用法(批量提交)deffetch_url(url):returnf"Fetched:{url}"urls=[f"http://example.com/{i}"foriinrange(10)]withThreadPoolExecutor(max_workers=5)asexecutor:results=executor.map(fetch_url,urls)forrinresults:print(r)# as_completed:按完成顺序处理结果withThreadPoolExecutor(max_workers=5)asexecutor:futures=[executor.submit(io_task)for_inrange(10)]forfutureinas_completed(futures):print(f"Task completed:{future.result()}")# ========== ProcessPoolExecutor ==========defcpu_intensive_task(n):"""CPU 密集型任务"""# 使用多进程绕过 GILreturnsum(i*iforiinrange(n))withProcessPoolExecutor(max_workers=4)asexecutor:results=list(executor.map(cpu_intensive_task,[1000000]*8))print(f"Results:{results}")3. ThreadPoolExecutor:I/O 密集型任务importaiohttpfromconcurrent.futuresimportThreadPoolExecutor,as_completedimportthreadingimporttime# 场景:并发下载多个文件defdownload_file(session,url,timeout=30):"""在线程中执行同步 HTTP 请求"""try:withsession.get(url,timeout=timeout)asresp:return{"url":url,"status":resp.status,"content_length":len(resp.content.read()),}exceptExceptionase:return{"url":url,"error":str(e)}defconcurrent_download(urls,max_workers=10):"""并发下载文件"""importrequests results=[]# 使用 requests.Session() 保持连接复用withThreadPoolExecutor(max_workers=max_workers)asexecutor:withrequests.Session()assession:futures={executor.submit(download_file,session,url):urlforurlinurls}forfutureinas_completed(futures):url=futures[future]try:result=future.result()results.append(result)exceptExceptionase:results.append({"url":url,"error":str(e)})returnresults# 使用 aiohttp 的异步版本(更适合 I/O 密集)asyncdefasync_download(urls,max_concurrent=10):"""异步并发下载(推荐)"""importaiohttp semaphore=asyncio.Semaphore(max_concurrent)asyncdeffetch(session,url):asyncwithsemaphore:try:asyncwithsession.get(url)asresp:content=awaitresp.read()return{"url":url,"status":resp.status,"size":len(content)}exceptExceptionase:return{"url":url,"error":str(e)}asyncwithaiohttp.ClientSession()assession:tasks=[fetch(session,url)forurlinurls]returnawaitasyncio.gather(*tasks)

更多文章