一、多进程的基本概念 在Python中,进程是程序执行的独立单元。多进程编程允许程序同时执行多个任务,充分利用多核CPU的性能。与多线程不同,多进程不受GIL(全局解释器锁)的限制,可以真正实现并行执行。
二、进程的创建与启动 1. 使用multiprocessing模块 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import multiprocessingimport timedef task (name ): print (f"Task {name} started" ) time.sleep(2 ) print (f"Task {name} completed" ) process1 = multiprocessing.Process(target=task, args=('A' ,)) process2 = multiprocessing.Process(target=task, args=('B' ,)) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
2. 使用继承方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import multiprocessingimport timeclass MyProcess (multiprocessing.Process): def __init__ (self, name ): super ().__init__() self .name = name def run (self ): print (f"Task {self.name} started" ) time.sleep(2 ) print (f"Task {self.name} completed" ) process1 = MyProcess('A' ) process2 = MyProcess('B' ) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
三、进程间通信 1. 队列(Queue) 队列是进程间通信的一种方式,它允许进程之间安全地传递数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import multiprocessingimport timedef producer (queue ): for i in range (5 ): time.sleep(1 ) queue.put(i) print (f"Produced: {i} " ) def consumer (queue ): for _ in range (5 ): item = queue.get() print (f"Consumed: {item} " ) time.sleep(1 ) queue = multiprocessing.Queue() process1 = multiprocessing.Process(target=producer, args=(queue,)) process2 = multiprocessing.Process(target=consumer, args=(queue,)) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
2. 管道(Pipe) 管道是进程间通信的另一种方式,它允许两个进程之间双向通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import multiprocessingimport timedef sender (pipe ): for i in range (5 ): time.sleep(1 ) pipe.send(i) print (f"Sent: {i} " ) pipe.close() def receiver (pipe ): while True : try : item = pipe.recv() print (f"Received: {item} " ) except EOFError: break parent_conn, child_conn = multiprocessing.Pipe() process1 = multiprocessing.Process(target=sender, args=(child_conn,)) process2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) process1.start() process2.start() process1.join() process2.join() print ("All tasks completed" )
3. 共享内存(Value和Array) 共享内存允许进程直接访问同一块内存区域,提高数据传输的效率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import multiprocessingimport timedef increment (counter, array ): for i in range (1000000 ): counter.value += 1 array[i % len (array)] += 1 def decrement (counter, array ): for i in range (1000000 ): counter.value -= 1 array[i % len (array)] -= 1 counter = multiprocessing.Value('i' , 0 ) array = multiprocessing.Array('i' , [0 ] * 10 ) process1 = multiprocessing.Process(target=increment, args=(counter, array)) process2 = multiprocessing.Process(target=decrement, args=(counter, array)) process1.start() process2.start() process1.join() process2.join() print (f"Final counter value: {counter.value} " )print (f"Final array values: {list (array)} " )
四、进程池 使用进程池可以更有效地管理进程,避免频繁创建和销毁进程的开销:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from concurrent.futures import ProcessPoolExecutorimport timedef task (name ): print (f"Task {name} started" ) time.sleep(2 ) print (f"Task {name} completed" ) return f"Result of task {name} " with ProcessPoolExecutor(max_workers=3 ) as executor: futures = [executor.submit(task, i) for i in range (5 )] for future in futures: result = future.result() print (f"Received: {result} " ) print ("All tasks completed" )
五、多进程的优缺点 1. 优点
真正的并行执行 :多进程不受GIL的限制,可以充分利用多核CPU的性能
更好的稳定性 :一个进程崩溃不会影响其他进程
更大的内存空间 :每个进程有自己的内存空间,避免了内存共享的问题
2. 缺点
启动开销大 :创建进程的开销比创建线程大
通信开销大 :进程间通信的开销比线程间通信大
资源消耗多 :每个进程都需要一定的内存和CPU资源
六、多进程与多线程的选择
场景
推荐使用
原因
CPU密集型任务
多进程
充分利用多核CPU,不受GIL限制
I/O密集型任务
多线程
启动开销小,通信方便
稳定性要求高
多进程
一个进程崩溃不影响其他进程
内存使用要求高
多线程
线程共享内存,内存使用更高效
七、实际应用示例 1. 并行计算 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from concurrent.futures import ProcessPoolExecutorimport timedef compute (n ): """计算1到n的和""" total = 0 for i in range (1 , n+1 ): total += i return total tasks = [100000000 , 200000000 , 300000000 , 400000000 ] start_time = time.time() with ProcessPoolExecutor(max_workers=4 ) as executor: results = list (executor.map (compute, tasks)) end_time = time.time() print (f"Results: {results} " )print (f"Time taken: {end_time - start_time:.2 f} seconds" )start_time = time.time() results_seq = [compute(n) for n in tasks] end_time = time.time() print (f"Sequential results: {results_seq} " )print (f"Sequential time taken: {end_time - start_time:.2 f} seconds" )
2. 并行处理文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import multiprocessingimport osdef process_file (filename ): """处理文件,统计单词数""" with open (filename, 'r' , encoding='utf-8' ) as f: content = f.read() word_count = len (content.split()) print (f"File {filename} has {word_count} words" ) return word_count files = ['file1.txt' , 'file2.txt' , 'file3.txt' , 'file4.txt' ] with multiprocessing.Pool(processes=4 ) as pool: results = pool.map (process_file, files) print (f"Total word count: {sum (results)} " )