PHP前端开发

python中分布式进程的详细介绍(附示例)

百变鹏仔 2小时前 #Python
文章标签 详细介绍

本篇文章给大家带来的内容是关于php中的sapi是什么?如何实现?(图文),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

Python的 multiprocessing 模块不但支持多进程, 其中 managers 子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。

BaseManager: 提供了不同机器进程之间共享数据的一种方法;

立即学习“Python免费学习笔记(深入)”;

(重要的点: ip:port)
# task_master.pyimport randomfrom multiprocessing import freeze_supportfrom queue import Queuefrom multiprocessing.managers import  BaseManager# 1. 创建需要的队列# task_queue:发送任务的队列# coding=utf-8import random,timefrom queue import Queuefrom multiprocessing.managers import BaseManagerfrom multiprocessing import freeze_supporttask_queue =  Queue()  # 发送任务的队列:result_queue = Queue() # 接收结果的队列:class QueueManager(BaseManager):  # 从BaseManager继承的QueueManager:    pass# windows下运行def return_task_queue():    global task_queue    return task_queue  # 返回发送任务队列def return_result_queue ():    global result_queue    return result_queue # 返回接收结果队列def test():    # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象    #QueueManager.register('get_task_queue', callable=lambda: task_queue)    #QueueManager.register('get_result_queue', callable=lambda: result_queue)    QueueManager.register('get_task_queue', callable=return_task_queue)    QueueManager.register('get_result_queue', callable=return_result_queue)    # 绑定端口4000, 设置验证码'sheenstar':    #manager = QueueManager(address=('', 4000), authkey=b'sheenstar')    # windows需要写ip地址    manager = QueueManager(address=('192.168.1.160', 4000), authkey=b'sheenstar')    manager.start()  # 启动Queue:    # 获得通过网络访问的Queue对象:    task = manager.get_task_queue()    result = manager.get_result_queue()    for i in range(13):   # 放几个任务进去:        n = random.randint(0, 10000)        print('Put task %d...' % n)        task.put(n)    # 从result队列读取结果:    print('Try get results...')    for i in range(13):        r = result.get(timeout=10)        print('Result: %s' % r)    # 关闭:    manager.shutdown()    print('master exit.')if __name__=='__main__':    freeze_support()    print('start!')    test()

运行程序,会等待执行结果10s,如果没有worker端获取任务,返回结果,程序将报错。

当我们在一台机器上写多进程程序时,创建的 Queue 可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的 task_queue 进行操作,那样就绕过了QueueManager 的封装,必须通过manager.get_task_queue()获得的 Queue 接口添加。

# coding=utf-8import time, sysfrom queue import Queuefrom multiprocessing.managers import BaseManager# 创建类似的QueueManager:class QueueManager(BaseManager):    pass# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')# 连接到服务器,也就是运行task_master.py的机器:server_addr = '192.168.1.160'print('Connect to server %s...' % server_addr)# 端口和验证码注意保持与task_master.py设置的完全一致:m = QueueManager(address=(server_addr, 4000), authkey=b'sheenstar')# 从网络连接:try:    m.connect()except:    print('请先启动task_master.py!')    #sys.exit("sorry, goodbye!");# 获取Queue的对象:task = m.get_task_queue()result = m.get_result_queue()# 从task队列取任务,并把结果写入result队列:for i in range(13):    try:        n = task.get()        print('run task %d * %d...' % (n, n))        r = '%d * %d = %d' % (n, n, n*n)        time.sleep(1)        result.put(r)    except ConnectionResetError as e:        print("任务执行结束,自动断开连接")# 处理结束:print('worker exit.')

使用命令行运行程序,结果更直观