python中分布式进程的详细介绍(附示例)
本篇文章给大家带来的内容是关于php中的sapi是什么?如何实现?(图文),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的 multiprocessing 模块不但支持多进程, 其中 managers 子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。
BaseManager: 提供了不同机器进程之间共享数据的一种方法;
立即学习“Python免费学习笔记(深入)”;
(重要的点: ip:port)
# task_master.pyimport randomfrom multiprocessing import freeze_supportfrom queue import Queuefrom multiprocessing.managers import BaseManager# 1. 创建需要的队列# task_queue:发送任务的队列# coding=utf-8import random,timefrom queue import Queuefrom multiprocessing.managers import BaseManagerfrom multiprocessing import freeze_supporttask_queue = Queue() # 发送任务的队列:result_queue = Queue() # 接收结果的队列:class QueueManager(BaseManager): # 从BaseManager继承的QueueManager: pass# windows下运行def return_task_queue(): global task_queue return task_queue # 返回发送任务队列def return_result_queue (): global result_queue return result_queue # 返回接收结果队列def test(): # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象 #QueueManager.register('get_task_queue', callable=lambda: task_queue) #QueueManager.register('get_result_queue', callable=lambda: result_queue) QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue) # 绑定端口4000, 设置验证码'sheenstar': #manager = QueueManager(address=('', 4000), authkey=b'sheenstar') # windows需要写ip地址 manager = QueueManager(address=('192.168.1.160', 4000), authkey=b'sheenstar') manager.start() # 启动Queue: # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() for i in range(13): # 放几个任务进去: n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(13): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')if __name__=='__main__': freeze_support() print('start!') test()
运行程序,会等待执行结果10s,如果没有worker端获取任务,返回结果,程序将报错。
当我们在一台机器上写多进程程序时,创建的 Queue 可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的 task_queue 进行操作,那样就绕过了QueueManager 的封装,必须通过manager.get_task_queue()获得的 Queue 接口添加。
# coding=utf-8import time, sysfrom queue import Queuefrom multiprocessing.managers import BaseManager# 创建类似的QueueManager:class QueueManager(BaseManager): pass# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')# 连接到服务器,也就是运行task_master.py的机器:server_addr = '192.168.1.160'print('Connect to server %s...' % server_addr)# 端口和验证码注意保持与task_master.py设置的完全一致:m = QueueManager(address=(server_addr, 4000), authkey=b'sheenstar')# 从网络连接:try: m.connect()except: print('请先启动task_master.py!') #sys.exit("sorry, goodbye!");# 获取Queue的对象:task = m.get_task_queue()result = m.get_result_queue()# 从task队列取任务,并把结果写入result队列:for i in range(13): try: n = task.get() print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except ConnectionResetError as e: print("任务执行结束,自动断开连接")# 处理结束:print('worker exit.')
使用命令行运行程序,结果更直观