python ProcessPoolExecutor

张开发
2026/4/7 5:38:22 15 分钟阅读

分享文章

python ProcessPoolExecutor
# Python 与 bz2不只是个压缩工具在 Python 的标准库里藏着不少像 bz2 这样不太起眼但相当实用的模块。第一次接触它的时候可能觉得这不过是个压缩解压的工具但用久了会发现它在数据处理流程中扮演的角色远比想象中重要。它到底是什么bz2 模块实现了 bzip2 压缩算法的 Python 接口。bzip2 这个算法有些年头了是 Julian Seward 在 1996 年开发的采用 Burrows-Wheeler 变换和霍夫曼编码的组合。听起来挺学术的但简单来说它是一种在特定场景下压缩效果特别好的算法尤其擅长处理文本类数据。和日常用的 zip 不同bz2 通常用于单个文件的压缩而不是打包多个文件。在 Linux 世界里.tar.bz2 这种格式很常见——先用 tar 打包再用 bz2 压缩。它能做什么最直接的用途当然是压缩和解压文件。比如有个几 GB 的日志文件需要传输或归档用 bz2 压缩后可能只剩原来的十分之一大小节省不少存储空间和传输时间。但 bz2 的价值不止于此。在数据处理流水线中经常需要处理压缩过的数据文件。很多公开数据集比如维基百科的数据库备份就是用 bz2 格式发布的。能够直接读取这些压缩文件而不需要先解压这个功能在实际工作中特别实用。还有些场景下需要在内存中对数据进行压缩。比如缓存大量文本数据时可以先压缩再存储虽然消耗一些 CPU 时间但能显著减少内存占用。怎么使用用 bz2 模块其实挺直观的。对于文件操作有现成的高层接口。读取压缩文件可以这样importbz2withbz2.open(data.txt.bz2,rt,encodingutf-8)asf:contentf.read()# 像处理普通文本文件一样处理内容注意那个rt参数r表示读取t表示文本模式。如果处理的是二进制数据就用rb。写文件也类似用wt或wb模式。有时候数据不在文件里而是在字符串或字节对象中这时候可以用压缩函数直接处理original_datab这是一段需要压缩的数据*100compressedbz2.compress(original_data)# compressed 现在小多了decompressedbz2.decompress(compressed)# 又恢复原样了对于特别大的文件流式处理更合适。bz2 提供了 BZ2Compressor 和 BZ2Decompressor 类可以边读边压缩或者边解压边处理不会一下子把整个文件加载到内存里。compressorbz2.BZ2Compressor()chunks[]withopen(large_file.txt,rb)asf:whilechunk:f.read(8192):compressed_chunkcompressor.compress(chunk)ifcompressed_chunk:chunks.append(compressed_chunk)# 最后别忘了刷新压缩器chunks.append(compressor.flush())compressed_datab.join(chunks)一些实际经验用 bz2 有些细节需要注意。压缩级别是个可调参数从 1 到 9数字越大压缩率越高但速度越慢。默认级别是 9追求最高压缩率。但如果对速度有要求可以适当调低级别。# 快速压缩压缩率稍低withbz2.open(output.bz2,wb,compresslevel1)asf:f.write(data)处理文本时编码问题容易出岔子。如果压缩时用了某种编码解压时要用同样的编码。utf-8 通常是最安全的选择。内存使用方面解压大文件时流式处理几乎是必须的。曾经遇到过有人试图一次性解压几个 GB 的压缩文件结果内存爆了。用迭代的方式处理就能避免这个问题withbz2.open(huge_file.bz2,rt)asf:forlineinf:process_line(line)# 一次只处理一行错误处理也很重要。压缩文件可能损坏或者解压时内存不足。好的做法是用 try-except 包裹可能出错的操作特别是 bz2.BZ2Error。和其他压缩方式的对比Python 标准库里还有 gzip 和 lzma 模块都提供压缩功能。gzip 基于 DEFLATE 算法压缩和解压速度通常比 bz2 快但压缩率一般不如 bz2。如果是网络传输或者需要快速压缩的场景gzip 可能更合适。很多 HTTP 服务器默认就支持 gzip 压缩。lzma 是相对较新的算法压缩率通常比 bz2 还要高但速度也更慢。如果需要极致压缩率而且不介意等待lzma 是个选择。选择哪种压缩方式其实是在压缩率、速度和资源消耗之间权衡。bz2 处在中间位置——压缩率不错速度可以接受内存使用也相对合理。对于需要长期存储、不经常访问的数据比如归档日志bz2 是个平衡的选择。还有个实际考虑是兼容性。几乎所有的 Linux 系统都自带 bzip2 工具跨平台共享数据时不用担心对方打不开。gzip 的兼容性更好lzma 相对新一些可能不是所有系统都默认安装。在 Python 生态里这些压缩模块的接口设计得很一致都模仿了内置的 open() 函数。学会用一个其他的也差不多会用了。这种一致性让在不同压缩格式间切换变得容易。最后bz2 这样的工具初看简单但用好了能解决不少实际问题。它不只是个压缩解压的库# ## 聊聊 Python 里的 lzma一个容易被忽略的压缩工具平时处理数据或者备份文件的时候经常会遇到需要压缩的情况。大家可能第一时间会想到 zip 或者 gzip毕竟它们太常见了。但在 Python 的标准库里还藏着一个叫 lzma 的模块它提供的压缩能力其实相当强悍只是平时用的人不太多。它到底是什么lzma 这个名字来源于 Lempel-Ziv-Markov chain Algorithm 这个压缩算法的缩写。听起来有点复杂其实可以把它理解为一套非常高效的压缩规则。这个算法最早用在 7-Zip 这个压缩软件里后来被收录到了 Python 的标准库中。在 Python 里lzma 不仅仅是一个算法名称更是一个可以直接调用的模块。它提供了完整的压缩和解压缩功能而且因为被集成在标准库里不需要安装任何额外的包就能直接用。这一点对于环境部署来说特别方便不用担心依赖问题。它能做什么这个模块主要就干两件事压缩数据和解压数据。但这里说的“数据”范围很广可以是字符串、文件甚至是网络传输的字节流。比如手头有一个很大的文本文件可能是日志文件或者数据导出文件占用了几百兆的空间。直接存储和传输都不太方便这时候就可以用 lzma 把它压缩一下体积可能会缩小到原来的三分之一甚至更小。等到需要用的时候再解压回来就行了。除了压缩文件它还能处理内存里的数据。有时候程序运行时会产生一些中间数据暂时不用但后面还要用到全放在内存里太占地方直接扔掉又不行。这时候可以把它压缩成字节形式放在内存里或者暂存到磁盘上能省下不少空间。具体怎么使用用 lzma 最简单的方式就是直接处理文件。打开文件、读取内容、压缩、写入新文件这几步操作 lzma 都提供了很直观的函数。举个例子假设要压缩一个叫data.log的日志文件代码写起来差不多是这样importlzmaimportshutilwithopen(data.log,rb)asf_in:withlzma.open(data.log.xz,wb)asf_out:shutil.copyfileobj(f_in,f_out)解压也一样简单把打开文件的顺序反过来就行。这种用法和内置的gzip模块几乎一模一样如果之前用过 gzip切换到 lzma 几乎不需要学习成本。除了这种文件级的操作有时候可能需要对一段字符串或者字节数据进行压缩。lzma 也提供了直接压缩内存数据的函数original_datab这是一段需要压缩的原始数据可能会重复出现很多次类似的内容...compressedlzma.compress(original_data)# 压缩后的 compressed 就可以存储或传输了# 需要的时候再解压回来decompressedlzma.decompress(compressed)实际使用中还可以调整压缩级别。级别越高压缩得越小但消耗的 CPU 和时间也越多。默认级别通常是个不错的平衡点但如果对压缩率有特别要求可以自己调整。一些使用上的建议虽然 lzma 用起来不难但有些细节还是需要注意的。首先要注意的是压缩和解压都比较消耗 CPU特别是压缩大文件的时候。如果是在服务器上运行最好留意一下 CPU 使用率避免影响其他服务。压缩级别不是越高越好。级别 9 的压缩率可能只比级别 6 高一点点但花费的时间可能要多好几倍。通常级别 6 到 8 是比较实用的选择能在压缩率和速度之间取得不错的平衡。处理大文件的时候最好不要一次性把整个文件读进内存。用上面那种流式处理的方式一边读一边压缩对内存友好得多。特别是处理几个 G 的大文件时这种区别会非常明显。压缩后的文件扩展名通常是.xz这也是 lzma 算法最常用的格式。如果希望别人也能方便地解压用这个扩展名会比较通用大部分压缩软件都认识它。和其他压缩方式的对比和常用的 gzip 相比lzma 的压缩率通常更高也就是说能把文件压得更小。但这是有代价的——压缩和解压都需要更多的计算资源速度会慢一些。所以如果追求极致的压缩率而且不太在乎时间lzma 是个好选择。如果只是随便压一下想要快一点gzip 可能更合适。和 zip 格式相比lzma 通常只压缩不打包而 zip 既能打包多个文件又能压缩。不过 Python 里可以用 lzma 配合其他模块实现类似的功能只是稍微麻烦一点。还有一个不太明显的优势是lzma 在某些类型的数据上表现特别出色。比如文本文件、源代码这些有很多重复内容的数据lzma 的压缩效果往往比# # 关于Python的ProcessPoolExecutor你可能需要知道这些在Python的世界里处理并行任务一直是个有趣的话题。当你需要同时做很多事情比如同时下载多个文件、处理大量数据或者同时运行多个计算任务时单线程就显得力不从心了。这时候ProcessPoolExecutor就进入了我们的视野。他是什么ProcessPoolExecutor是Python标准库concurrent.futures模块中的一个类。简单来说它是一个进程池执行器能够让你以相对简单的方式利用多核CPU来并行执行任务。它不像多线程那样受限于全局解释器锁而是通过创建多个独立的Python进程来真正实现并行计算。想象一下你有一个厨房里面只有一个厨师。如果要做一桌菜这个厨师就得一道一道地做效率自然不高。ProcessPoolExecutor就像是请来了几个厨师每个人负责一道菜同时进行最后一起上桌。他能做什么ProcessPoolExecutor主要解决的是CPU密集型任务的并行化问题。所谓CPU密集型就是那些需要大量计算、不怎么需要等待外部输入输出的任务。比如图像处理、数据加密解密、科学计算、机器学习模型的训练等。如果你要处理一万张图片每张都需要进行复杂的滤镜处理用单进程可能需要几个小时。但如果你有8个CPU核心用ProcessPoolExecutor可能只需要原来的八分之一时间。不过需要注意的是它不太适合I/O密集型任务。如果你的任务大部分时间都在等待网络响应或者磁盘读写那么多线程或者异步IO可能是更好的选择。怎么使用使用ProcessPoolExecutor其实比想象中简单。先来看个最基本的例子fromconcurrent.futuresimportProcessPoolExecutorimporttimedefprocess_item(item):# 模拟一个耗时的计算任务time.sleep(1)returnitem*2if__name____main__:items[1,2,3,4,5,6,7,8]withProcessPoolExecutor(max_workers4)asexecutor:resultslist(executor.map(process_item,items))print(results)这段代码创建了一个最多有4个工作进程的进程池然后并行处理8个任务。由于只有4个进程所以会分两批处理总共耗时大约2秒左右而不是单进程的8秒。ProcessPoolExecutor提供了两种主要的使用方式map和submit。map适合处理可迭代对象它会自动分配任务并收集结果。submit则更灵活可以提交单个任务返回一个Future对象你可以稍后获取结果。fromconcurrent.futuresimportProcessPoolExecutor,as_completeddefprocess_item(item):time.sleep(item)# 模拟不同耗时的任务returnf处理完成:{item}if__name____main__:withProcessPoolExecutor()asexecutor:# 使用submit提交任务futures[executor.submit(process_item,i)foriin[1,3,2]]# 按完成顺序获取结果forfutureinas_completed(futures):print(future.result())这里用submit提交了三个任务耗时分别是1秒、3秒和2秒。使用as_completed可以按照任务完成的顺序获取结果而不是按照提交的顺序。最佳实践使用ProcessPoolExecutor时有几个细节值得注意。首先进程池的创建是有成本的。每个工作进程都需要启动一个完整的Python解释器这会消耗时间和内存。所以不要为每个小任务都创建新的进程池最好是复用同一个。其次max_workers参数的选择需要权衡。默认情况下它会根据CPU核心数来设置但有时候你可能需要手动调整。如果任务完全是CPU密集型的那么设置为核心数通常是最优的。但如果任务还涉及一些I/O等待可以适当增加工作进程数。数据序列化也是一个需要考虑的问题。因为进程间通信需要通过序列化和反序列化所以传递给工作进程的参数和返回的结果都必须是可序列化的。如果传递大型对象序列化的开销可能会抵消并行带来的收益。还有一个常见的陷阱是在Windows系统上使用。由于Windows的进程创建机制不同使用ProcessPoolExecutor时一定要把主要逻辑放在if __name__ __main__:下面否则可能会遇到奇怪的错误。异常处理也很重要。工作进程中发生的异常不会自动传播到主进程而是会被封装在Future对象中。当你调用result()时如果任务发生了异常这个异常会被重新抛出。所以最好用try-except包裹result()调用。fromconcurrent.futuresimportProcessPoolExecutordefrisky_task(x):ifx0:raiseValueError(不能处理0)return10/xif__name____main__:withProcessPoolExecutor()asexecutor:futures[executor.submit(risky_task,i)foriinrange(3)]forfutureinfutures:try:resultfuture.result()print(f结果:{result})exceptExceptionase:print(f任务失败:{e})和同类技术对比Python中实现并行的方式不止一种每种都有其适用场景。多线程适合I/O密集型任务但由于全局解释器锁的存在多线程在CPU密集型任务上并不能真正并行。multiprocessing模块是Python另一个实现多进程的模块它提供了更底层的控制。ProcessPoolExecutor可以看作是multiprocessing.Pool的一个更现代的封装API更加简洁一致特别是与ThreadPoolExecutor保持了相同的接口。asyncio则是另一种思路它基于事件循环和协程适合高并发的I/O操作。但asyncio本身并不能利用多核CPU如果需要CPU并行通常需要结合ProcessPoolExecutor使用。最近几年还有一些第三方库提供了更高级的并行计算能力比如joblib、dask等。这些库通常提供了更丰富的功能比如延迟计算、任务调度优化等但相应地也更加复杂。选择哪种技术主要取决于你的具体需求。如果只是简单的CPU并行ProcessPoolExecutor通常就足够了。它的优势在于简单易用是Python标准库的一部分不需要额外安装而且API设计得很友好。在实际项目中经常会看到这样的模式用asyncio处理高并发的I/O用ProcessPoolExecutor处理CPU密集的计算两者结合发挥各自的优势。最后想说并行计算并不是银弹。它增加了程序的复杂度引入了新的问题比如进程间通信、数据一致性、调试困难等。在决定使用并行之前最好先确认是否真的需要。有时候优化算法或者使用更高效的数据结构可能比并行化带来更大的性能提升。 gzip 好不少。但对于已经是压缩格式的文件比如图片、视频再压一次的意义就不大了有时候甚至还会变大。总的来说lzma 算是 Python 标准库里一个被低估的工具。它可能不像 gzip 那样随处可见但在需要高压缩率的场景下它能提供相当不错的效果。而且由于是标准库的一部分兼容性和稳定性都有保证不用担心哪天突然不能用了。下次需要压缩数据的时候不妨考虑一下这个选项。更是数据处理工具箱中的一件实用工具。理解它的特点知道什么时候该用它什么时候该用其他方案这种判断力可能比记住所有 API 细节更有价值。实际工作中数据压缩往往不是最终目的而是整个工作流中的一环。选择压缩方案时得考虑数据的特点、处理流程、性能要求还有团队的技术栈。bz2 不一定总是最好的选择但在很多场景下它确实是个可靠、实用的选项。

更多文章