❝Python入门第三十课,主要是学习了几个控制进程的方式,如何添加/释放进程锁,阻塞进程,终止进程,以及守护进程。
为了防止多个进程同时打印或操作同一资源的数据错乱,可以使用 Lock。
数据错乱演示:
import timefrom multiprocessing import Processdefspeak():for index in range(10): print('好好', end='') print('学习', end='') print('天天', end='') print('向上') time.sleep(1)defstudy():for index in range(15): print('A', end='') print('B', end='') print('C', end='') print('D') time.sleep(1)if __name__ == '__main__': print('我是主进程中的【第一行】打印') p1 = Process(target=speak) p2 = Process(target=study) p1.start() p2.start() print('我是主进程中的【最后一行】打印')多次运行上面的代码,会出现(不是每次都出现)类似如下的异常:

❏ 传统用法:
lock.acquire() # 上锁# ... 临界区代码 ...lock.release() # 释放锁❏ 上下文管理器用法(推荐):
with lock:# 自动上锁,退出块时自动释放# 好处:即便发生异常也能保证释放锁,避免死锁上面的【数据错乱演示】代码增加 Lock 调整后的代码如下:
import timefrom multiprocessing import Process, Lockdefspeak(process_lock):for index in range(10):# 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待 process_lock.acquire() print('好好', end='') print('学习', end='') print('天天', end='') print('向上')# 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等) process_lock.release() time.sleep(1)defstudy(process_lock):for index in range(15):# with process_lock: 会自动完成两件事:# (1)进入前:自动执行 process_lock.acquire() 上锁# (2)离开后:自动执行 process_lock.release() 释放锁# 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”with process_lock: print('A', end='') print('B', end='') print('C', end='') print('D') time.sleep(1)if __name__ == '__main__': print('我是主进程中的【第一行】打印') lock = Lock() p1 = Process(target=speak, args=(lock,)) p2 = Process(target=study, args=(lock,)) p1.start() p2.start() print('我是主进程中的【最后一行】打印')传统 Lock 在面对多次上锁时,处理不当可能会产生死锁状态,比如:连续两次『上锁』期间『没有释放前一次的锁』,就会造成死锁。原因如下:
❝上锁的逻辑是:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待。
所以,第一次上锁后,锁被占用,第二次上锁之前,上一次的锁没有被释放,继续上锁就会导致当前进程一直处于阻塞等待状态。
死锁演示代码:(下面只是为了演示死锁效果,实际代码中不可能连续两行进行上锁,但是实际业务中多人开发代码嵌套复用,就有可能出现连续上锁导致死锁的场景)
import timefrom multiprocessing import Process, Lockdefspeak(process_lock):for index in range(10):# 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待 process_lock.acquire() process_lock.acquire() # 这一行就会造成死锁,因为它会一直等上一行上的锁被释放 print('好好', end='') print('学习', end='') print('天天', end='') print('向上')# 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等) process_lock.release() process_lock.release() time.sleep(1)defstudy(process_lock):for index in range(15):# with process_lock: 会自动完成两件事:# (1)进入前:自动执行 process_lock.acquire() 上锁# (2)离开后:自动执行 process_lock.release() 释放锁# 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”with process_lock: print('A', end='') print('B', end='') print('C', end='') print('D') time.sleep(1)if __name__ == '__main__': print('我是主进程中的【第一行】打印') lock = Lock() p1 = Process(target=speak, args=(lock,)) p2 = Process(target=study, args=(lock,)) p1.start() p2.start() print('我是主进程中的【最后一行】打印')上面代码运行结果如下:

针对这个问题解决办法就是使用RLock替代Lock。
对于上面死锁的问题,解决办法只需要在定义进程的时候,把传统的Lock替换成RLock即可,完整示例代码如下:
import timefrom multiprocessing import Process, RLockdefspeak(process_lock):for index in range(10):# 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待 process_lock.acquire() process_lock.acquire() # 这一行就会造成死锁,因为它会一直等上一行上的锁被释放 print('好好', end='') print('学习', end='') print('天天', end='') print('向上')# 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等) process_lock.release() # 注意:上了几次锁,这里就要释放几次锁 process_lock.release() time.sleep(1)defstudy(process_lock):for index in range(15):# with process_lock: 会自动完成两件事:# (1)进入前:自动执行 process_lock.acquire() 上锁# (2)离开后:自动执行 process_lock.release() 释放锁# 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”with process_lock: print('A', end='') print('B', end='') print('C', end='') print('D') time.sleep(1)if __name__ == '__main__': print('我是主进程中的【第一行】打印') lock = RLock() p1 = Process(target=speak, args=(lock,)) p2 = Process(target=study, args=(lock,)) p1.start() p2.start() print('我是主进程中的【最后一行】打印')注意:如果是传统上锁的写法,上了几次锁,就必须对应释放几次锁。
process_lock.acquire()process_lock.acquire() # ... 其他代码 ...# 注意:上了几次锁,这里就要释放几次锁process_lock.release()process_lock.release()❏ 作用
阻塞当前进程(通常是主进程),直到调用join的那个进程执行完毕。
❏ 语法
join(timeout),参数timeout为超时时间(单位:秒),该参数是可选的,表示等多久。
如果时间到了进程还没结束,主进程就不等了,会继续执行。
❏ 注意
join必须在start()之后调用。p.join()不是让进程 p 等,而是让“执行这行 join 代码的那个进程”去等,等的是 p 这个进程。timeout所指定的时间后,进程并不会终止,只是“不再等”了。import osimport timefrom multiprocessing import Processdefspeak():for index in range(10): print(f'我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}') time.sleep(1)defstudy():for index in range(15): print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}') time.sleep(1)if __name__ == '__main__': print('我是主进程中的【第一行】打印') p1 = Process(target=speak) p2 = Process(target=study) p1.start() p1.join(5) # 让主进程等 p1 5秒钟 p2.start()# p1.join() # 让主进程等 p1 进程执行完毕后,主进程在继续执行# p2.join() # 让主进程等 p2 进程执行完毕后,主进程再继续执行 print('我是主进程中的【最后一行】打印')建议:尝试在不同的位置使用join,并观察运行结果。
❏ 作用
向操作系统申请强制终止进程。
❏ 注意
使用terminate终止进程, 不会执行finally代码块。
❏ 辅助方法
is_alive()可用于查看进程是否还“活着”。
import osimport timefrom multiprocessing import Processdefspeak():try:for index in range(10): print(f'我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}') time.sleep(1)finally: print('我是finally里的逻辑')defstudy():for index in range(15): print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}') time.sleep(1)if __name__ == '__main__': print('我是主进程中的【第一行】打印') p1 = Process(target=speak) p2 = Process(target=study) p1.start() p2.start()# 3 秒后终止 p1 进程 time.sleep(3) print('xxxxx 终止了p1进程....')# 向操作系统申请强制终止p1进程 p1.terminate()# 等操作系统彻底终止掉了p1进程 p1.join()# 查看p1进程是否还“活着” print(p1.is_alive()) # 由于p1.terminate() 方法是异步的,需要先调用 p1.join() 在检查进程存活状态 print('我是主进程中的【最后一行】打印')Python中守护进程是一种“依附于主进程存在的子进程”,一旦主进程结束,它就会被自动终止。
它的核心行为是『自动结束』:一旦父进程代码执行完毕,守护进程就会立即被强制终止,无论它自身的任务是否完成。
基础示例:
import timefrom multiprocessing import Processdefbackground_task(): index: int = 1whileTrue: print(f'[Daemon] 守护进程工作中{index}') time.sleep(1) index += 1defmain(): p = Process(target=background_task, daemon=True) p.start()for index in range(3): print(f'[Main] 主进程工作了 {index + 1} 秒') time.sleep(1) print(f'[Main] 主进程工作结束@@')if __name__ == '__main__': main()运行上面的代码,会发现程序在3秒后就退出了,守护进程也随之被强制终止,具体效果如下:

注意点:
start之前,start()之后,不能再设置daemon。❝有时候,即使父进程代码执行完了,我们看起来主进程还没退出,这通常是因为还有其他的非守护进程在运行。主进程会等待所有非守护进程结束,而守护进程则会随着父进程(此时是代码执行结束的主进程)被“清理”掉。
守护进程适用于非关键、辅助性、可随时丢弃的任务。
注意:不适用于任何关键数据操纵、必须完成的结算任务、需要可靠关闭的网络服务。
import osimport timefrom multiprocessing import Process# 定义【守护进程】业务逻辑defmonitor():whileTrue:try:with open('log.txt', 'r', encoding='utf-8') as f: lines = sum(1for _ in f)except FileNotFoundError: lines = 0 print(f'我是【守护进程({os.getpid()})】,log.txt 共有{lines}行') time.sleep(1)#定义【非守护进程】业务逻辑deftest():for index in range(30): print(f'我是test({os.getpid()}), {index}') time.sleep(1)if __name__ == '__main__': print(f'我是主进程({os.getpid()})中的【第一行】代码')# 创建 p1 守护进程 p1 = Process(target=monitor, daemon=True)# 创建 p2 非守护进程 p2 = Process(target=test) p1.start() p2.start()# 记录日志:向文件中写入数据with open('log.txt', 'a', encoding='utf-8') as f:for index in range(10): f.write(f'测试日志{index}\n') f.flush() time.sleep(1) print(f'我是主进程({os.getpid()})中的【最后一行】代码')