Skip to content

11. 进程池

python
import time
from concurrent.futures import ProcessPoolExecutor


def task(num):
    print(f"Task {num} is running")
    time.sleep(2)


if __name__ == "__main__":
    pool = ProcessPoolExecutor(max_workers=4)
    for i in range(8):
        pool.submit(task, i)

    print("All tasks submitted.")  # 正常往下走
    print("Main process is doing other work...")

# 输出结果:
# All tasks submitted.
# Main process is doing other work...
# Task 0 is running
# Task 1 is running
# Task 2 is running
# Task 3 is running
# Task 4 is running
# Task 5 is running
# Task 6 is running
# Task 7 is running

如果想要等待所有任务完成后再继续执行,可以使用 pool.shutdown(wait=True)

注意:如果在进程池中要使用进程锁,则需要基于 Manager 中的 Lock 和 RLock 来实现进程间的锁机制。

进程池与锁配合使用

在进程池中使用锁时,不能直接使用 multiprocessing.RLock(),而需要通过 multiprocessing.Manager() 来创建锁对象:

python
import time
import multiprocessing
from concurrent.futures.process import ProcessPoolExecutor


def task(lock):
    print("开始")
    # lock.acquire()
    # lock.release()
    with lock:
        # 假设文件中保存的内容就是一个值: 10
        with open("f1.txt", mode="r", encoding="utf-8") as f:
            current_num = int(f.read())

        print("排队抢票了")
        time.sleep(1)
        current_num -= 1

        with open("f1.txt", mode="w", encoding="utf-8") as f:
            f.write(str(current_num))


if __name__ == "__main__":
    pool = ProcessPoolExecutor()
    # lock_object = multiprocessing.RLock() # 不能使用
    manager = multiprocessing.Manager()
    lock_object = manager.RLock()
    for i in range(10):
        pool.submit(task, lock_object)

关键要点:

  • 在进程池中,普通的 multiprocessing.RLock() 无法正常工作
  • 必须使用 multiprocessing.Manager() 创建管理器
  • 通过管理器的 manager.RLock() 方法创建可在进程池中共享的锁
  • 可以使用 with lock: 语法自动获取和释放锁,更加简洁和安全

构建时间:11/21/2025, 1:28:39 PM | 本博客内容均为自己学习,如内容涉及侵权,请联系邮箱:pangzl0215@163.com