多进程的基本概念

在Python中,进程是程序执行的独立单元。多进程编程允许程序同时执行多个任务,充分利用多核CPU的性能。与多线程不同,多进程不受GIL(全局解释器锁)的限制,可以真正实现并行执行。

进程的创建与启动

使用multiprocessing模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import multiprocessing
import time

def task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} completed")

# 创建进程
process1 = multiprocessing.Process(target=task, args=('A',))
process2 = multiprocessing.Process(target=task, args=('B',))

# 启动进程
process1.start()
process2.start()

# 等待进程完成
process1.join()
process2.join()

print("All tasks completed")

使用继承方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import multiprocessing
import time

class MyProcess(multiprocessing.Process):
def __init__(self, name):
super().__init__()
self.name = name

def run(self):
print(f"Task {self.name} started")
time.sleep(2)
print(f"Task {self.name} completed")

# 创建进程
process1 = MyProcess('A')
process2 = MyProcess('B')

# 启动进程
process1.start()
process2.start()

# 等待进程完成
process1.join()
process2.join()

print("All tasks completed")

进程间通信

队列(Queue)

队列是进程间通信的一种方式,它允许进程之间安全地传递数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import multiprocessing
import time

def producer(queue):
for i in range(5):
time.sleep(1)
queue.put(i)
print(f"Produced: {i}")

def consumer(queue):
for _ in range(5):
item = queue.get()
print(f"Consumed: {item}")
time.sleep(1)

# 创建队列
queue = multiprocessing.Queue()

# 创建进程
process1 = multiprocessing.Process(target=producer, args=(queue,))
process2 = multiprocessing.Process(target=consumer, args=(queue,))

# 启动进程
process1.start()
process2.start()

# 等待进程完成
process1.join()
process2.join()

print("All tasks completed")

管道(Pipe)

管道是进程间通信的另一种方式,它允许两个进程之间双向通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import multiprocessing
import time

def sender(pipe):
for i in range(5):
time.sleep(1)
pipe.send(i)
print(f"Sent: {i}")
pipe.close()

def receiver(pipe):
while True:
try:
item = pipe.recv()
print(f"Received: {item}")
except EOFError:
break

# 创建管道
parent_conn, child_conn = multiprocessing.Pipe()

# 创建进程
process1 = multiprocessing.Process(target=sender, args=(child_conn,))
process2 = multiprocessing.Process(target=receiver, args=(parent_conn,))

# 启动进程
process1.start()
process2.start()

# 等待进程完成
process1.join()
process2.join()

print("All tasks completed")

共享内存(Value和Array)

共享内存允许进程直接访问同一块内存区域,提高数据传输的效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import multiprocessing
import time

def increment(counter, array):
for i in range(1000000):
counter.value += 1
array[i % len(array)] += 1

def decrement(counter, array):
for i in range(1000000):
counter.value -= 1
array[i % len(array)] -= 1

# 创建共享内存
counter = multiprocessing.Value('i', 0)
array = multiprocessing.Array('i', [0] * 10)

# 创建进程
process1 = multiprocessing.Process(target=increment, args=(counter, array))
process2 = multiprocessing.Process(target=decrement, args=(counter, array))

# 启动进程
process1.start()
process2.start()

# 等待进程完成
process1.join()
process2.join()

print(f"Final counter value: {counter.value}")
print(f"Final array values: {list(array)}")

进程池

使用进程池可以更有效地管理进程,避免频繁创建和销毁进程的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from concurrent.futures import ProcessPoolExecutor
import time

def task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} completed")
return f"Result of task {name}"

# 创建进程池
with ProcessPoolExecutor(max_workers=3) as executor:
# 提交任务
futures = [executor.submit(task, i) for i in range(5)]

# 获取结果
for future in futures:
result = future.result()
print(f"Received: {result}")

print("All tasks completed")

多进程的优缺点

优点

  1. 真正的并行执行:多进程不受GIL的限制,可以充分利用多核CPU的性能。
  2. 更好的稳定性:一个进程崩溃不会影响其他进程。
  3. 更大的内存空间:每个进程有自己的内存空间,避免了内存共享的问题。

缺点

  1. 启动开销大:创建进程的开销比创建线程大。
  2. 通信开销大:进程间通信的开销比线程间通信大。
  3. 资源消耗多:每个进程都需要一定的内存和CPU资源。

多进程与多线程的选择

场景 推荐使用 原因
CPU密集型任务 多进程 充分利用多核CPU,不受GIL限制
I/O密集型任务 多线程 启动开销小,通信方便
稳定性要求高 多进程 一个进程崩溃不影响其他进程
内存使用要求高 多线程 线程共享内存,内存使用更高效

实际应用示例

并行计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from concurrent.futures import ProcessPoolExecutor
import time

def compute(n):
"""计算从1到n的和"""
total = 0
for i in range(1, n+1):
total += i
return total

# 要计算的任务
tasks = [100000000, 200000000, 300000000, 400000000]

# 使用进程池并行计算
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute, tasks))
end_time = time.time()

print(f"Results: {results}")
print(f"Time taken: {end_time - start_time:.2f} seconds")

# 顺序计算作为对比
start_time = time.time()
results_seq = [compute(n) for n in tasks]
end_time = time.time()

print(f"Sequential results: {results_seq}")
print(f"Sequential time taken: {end_time - start_time:.2f} seconds")

并行处理文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import multiprocessing
import os

def process_file(filename):
"""处理文件,统计单词数"""
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
word_count = len(content.split())
print(f"File {filename} has {word_count} words")
return word_count

# 要处理的文件
files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']

# 使用进程池并行处理
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(process_file, files)

print(f"Total word count: {sum(results)}")

总结

Python的多进程编程是一个强大的工具,它可以帮助你充分利用多核CPU的性能,提高程序的执行效率。通过本文的介绍,你应该已经掌握了Python多进程编程的基本概念和使用方法。

记住,多进程编程虽然强大,但也带来了一些挑战,如进程间通信的开销和资源消耗。在使用多进程时,你应该根据具体的场景选择合适的进程间通信方式,并注意避免常见的陷阱。

对于CPU密集型任务,多进程是一个不错的选择,可以充分利用多核CPU的性能。对于I/O密集型任务,多线程可能更适合,因为它的启动开销小,通信方便。

通过合理使用多进程和多线程,你可以编写出更高效、更响应的Python程序。