python多进程共享一个可操作的变量, 如何保证原子操作?-灵析社区

中年复健狗

python多进程共享一个可操作的变量, 如何保证原子操作? **我的需求** 目前有多组数据需要执行计算型任务, 每执行完一组数据就要给java通知并传递生成的结果文件, 并且我要在把所有任务都执行完的时候(也就是最后一组数据执行完的时候)告诉java本轮次数据全部执行完毕, java那边就要去整体做数据库入库的操作。 **我的设想** 多进程之间维护一个整型数值, 在执行完一组数据的时候自增1, 并在通知java的方法中去比较这个整型数值和总任务数量, 相等就代表全部完毕。(这中间不考虑计算任务执行失败的情况) **面临的情况** 我通过 `multiprocessing` 模块中的 `Manager` 去声明了一个整型变量, 然后给每个进程传递了过去, 在执行自增的时候, 出现了多个进程读取到同一个值的情况(详细可以看下面的输出, 或者自行执行下面的demo), 无法满足数值的读写原子, 我尝试了加锁, 但是没有生效。 **这是我抽象出来的demo** from concurrent.futures import ProcessPoolExecutor import ctypes from multiprocessing import Manager, Lock from multiprocessing.managers import ValueProxy import os m = Manager().Value(ctypes.c_int, 0) def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int): """模拟耗时任务函数""" # 模拟耗时计算 res = x**y # with Lock(): 加锁也不管用... # 多进程共享变量, 用于比较总任务数量 with Lock(): _m.value += 1 # 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了 if _m.value == total_tasks: print(True) print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}") def main(): # 以下假设有8组任务 t1 = (100, 200, 300, 400, 500, 600, 700, 800) t2 = (80, 70, 60, 50, 40, 30, 20, 10) len_t = len(t1) # 多进程执行cpu耗时性任务 with ProcessPoolExecutor(max_workers=len_t) as executor: {executor.submit(calc_number, x, y, m, len_t) for x, y in zip(t1, t2)} if __name__ == "__main__": main() 这是我目前的demo,从我的业务代码中抽象出来的。 这是代码的输出打印(很明显是错误的): m_value: 2, p_id: 14873, res: 118059162071741130342400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 m_value: 2, p_id: 14877, res: 12676506002282294014967032053760000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 m_value: 3, p_id: 14875, res: 42391158275216203514294433201000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 m_value: 3, p_id: 14872, res: 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 m_value: 4, p_id: 14883, res: 797922662976120010000000000000000000000000000000000000000 m_value: 5, p_id: 14879, res: 909494701772928237915039062500000000000000000000000000000000000000000000000000000000000000000000000000000000 m_value: 5, p_id: 14881, res: 221073919720733357899776000000000000000000000000000000000000000000000000000000000000 m_value: 6, p_id: 14885, res: 107374182400000000000000000000 正确的输出应该 `m_value` 是 `1,2,3,4,5,6,7,8` 以此打印出来的。 希望对此颇有研究的大佬指点迷津, 或者给出其他可行方案。最好贴出代码, 不胜感激。 * * * **问题已解决:** 锁多次创建没有保证是同一把锁是主因

阅读量:22

点赞量:0

问AI
from concurrent.futures import ProcessPoolExecutor import ctypes from multiprocessing import Manager, Lock import os # 创建 Manager 和 Lock manager = Manager() m = manager.Value(ctypes.c_int, 0) lock = manager.Lock() def calc_number(x: int, y: int, _m, total_tasks: int, _lock): """模拟耗时任务函数""" # 模拟耗时计算 res = x ** y # 用锁来保证原子操作 with _lock: _m.value += 1 current_value = _m.value # 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了 if current_value == total_tasks: print(True) print(f"m_value: {current_value}, p_id: {os.getpid()}, res: {res}") def main(): # 任务参数 t1 = (100, 200, 300, 400, 500, 600, 700, 800) t2 = (80, 70, 60, 50, 40, 30, 20, 10) len_t = len(t1) # 多进程执行任务 with ProcessPoolExecutor(max_workers=len_t) as executor: for x, y in zip(t1, t2): executor.submit(calc_number, x, y, m, len_t, lock) if __name__ == "__main__": main()