多进程的基本概念 在Python中,进程是程序执行的独立单元。多进程编程允许程序同时执行多个任务,充分利用多核CPU的性能。与多线程不同,多进程不受GIL(全局解释器锁)的限制,可以真正实现并行执行。
进程的创建与启动 使用multiprocessing模块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import multiprocessingimport timedef task (name ): print (f"Task {name} started" ) time.sleep(2 ) print (f"Task {name} completed" ) process1 = multiprocessing.Process(target=task, args=('A' ,)) process2 = multiprocessing.Process(target=task, args=('B' ,)) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
使用继承方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import multiprocessingimport timeclass MyProcess (multiprocessing.Process): def __init__ (self, name ): super ().__init__() self .name = name def run (self ): print (f"Task {self.name} started" ) time.sleep(2 ) print (f"Task {self.name} completed" ) process1 = MyProcess('A' ) process2 = MyProcess('B' ) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
进程间通信 队列(Queue) 队列是进程间通信的一种方式,它允许进程之间安全地传递数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import multiprocessingimport timedef producer (queue ): for i in range (5 ): time.sleep(1 ) queue.put(i) print (f"Produced: {i} " ) def consumer (queue ): for _ in range (5 ): item = queue.get() print (f"Consumed: {item} " ) time.sleep(1 ) queue = multiprocessing.Queue() process1 = multiprocessing.Process(target=producer, args=(queue,)) process2 = multiprocessing.Process(target=consumer, args=(queue,)) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
管道(Pipe) 管道是进程间通信的另一种方式,它允许两个进程之间双向通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import multiprocessingimport timedef sender (pipe ): for i in range (5 ): time.sleep(1 ) pipe.send(i) print (f"Sent: {i} " ) pipe.close() def receiver (pipe ): while True : try : item = pipe.recv() print (f"Received: {item} " ) except EOFError: break parent_conn, child_conn = multiprocessing.Pipe() process1 = multiprocessing.Process(target=sender, args=(child_conn,)) process2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
共享内存(Value和Array) 共享内存允许进程直接访问同一块内存区域,提高数据传输的效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import multiprocessingimport timedef increment (counter, array ): for i in range (1000000 ): counter.value += 1 array[i % len (array)] += 1 def decrement (counter, array ): for i in range (1000000 ): counter.value -= 1 array[i % len (array)] -= 1 counter = multiprocessing.Value('i' , 0 ) array = multiprocessing.Array('i' , [0 ] * 10 ) process1 = multiprocessing.Process(target=increment, args=(counter, array)) process2 = multiprocessing.Process(target=decrement, args=(counter, array)) process1.start() process2.start() process1.join() process2.join() print (f"Final counter value: {counter.value} " )print (f"Final array values: {list (array)} " )
进程池 使用进程池可以更有效地管理进程,避免频繁创建和销毁进程的开销。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from concurrent.futures import ProcessPoolExecutorimport timedef task (name ): print (f"Task {name} started" ) time.sleep(2 ) print (f"Task {name} completed" ) return f"Result of task {name} " with ProcessPoolExecutor(max_workers=3 ) as executor: futures = [executor.submit(task, i) for i in range (5 )] for future in futures: result = future.result() print (f"Received: {result} " ) print ("All tasks completed" )
多进程的优缺点 优点
真正的并行执行 :多进程不受GIL的限制,可以充分利用多核CPU的性能。
更好的稳定性 :一个进程崩溃不会影响其他进程。
更大的内存空间 :每个进程有自己的内存空间,避免了内存共享的问题。
缺点
启动开销大 :创建进程的开销比创建线程大。
通信开销大 :进程间通信的开销比线程间通信大。
资源消耗多 :每个进程都需要一定的内存和CPU资源。
多进程与多线程的选择
场景
推荐使用
原因
CPU密集型任务
多进程
充分利用多核CPU,不受GIL限制
I/O密集型任务
多线程
启动开销小,通信方便
稳定性要求高
多进程
一个进程崩溃不影响其他进程
内存使用要求高
多线程
线程共享内存,内存使用更高效
实际应用示例 并行计算 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from concurrent.futures import ProcessPoolExecutorimport timedef compute (n ): """计算从1到n的和""" total = 0 for i in range (1 , n+1 ): total += i return total tasks = [100000000 , 200000000 , 300000000 , 400000000 ] start_time = time.time() with ProcessPoolExecutor(max_workers=4 ) as executor: results = list (executor.map (compute, tasks)) end_time = time.time() print (f"Results: {results} " )print (f"Time taken: {end_time - start_time:.2 f} seconds" )start_time = time.time() results_seq = [compute(n) for n in tasks] end_time = time.time() print (f"Sequential results: {results_seq} " )print (f"Sequential time taken: {end_time - start_time:.2 f} seconds" )
并行处理文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import multiprocessingimport osdef process_file (filename ): """处理文件,统计单词数""" with open (filename, 'r' , encoding='utf-8' ) as f: content = f.read() word_count = len (content.split()) print (f"File {filename} has {word_count} words" ) return word_count files = ['file1.txt' , 'file2.txt' , 'file3.txt' , 'file4.txt' ] with multiprocessing.Pool(processes=4 ) as pool: results = pool.map (process_file, files) print (f"Total word count: {sum (results)} " )
总结 Python的多进程编程是一个强大的工具,它可以帮助你充分利用多核CPU的性能,提高程序的执行效率。通过本文的介绍,你应该已经掌握了Python多进程编程的基本概念和使用方法。
记住,多进程编程虽然强大,但也带来了一些挑战,如进程间通信的开销和资源消耗。在使用多进程时,你应该根据具体的场景选择合适的进程间通信方式,并注意避免常见的陷阱。
对于CPU密集型任务,多进程是一个不错的选择,可以充分利用多核CPU的性能。对于I/O密集型任务,多线程可能更适合,因为它的启动开销小,通信方便。
通过合理使用多进程和多线程,你可以编写出更高效、更响应的Python程序。