并发模式:主动对象
介绍
主动对象模式是一种并发设计模式,它将方法执行与方法调用解耦。此模式的主要目标是通过在单独的线程中执行操作来引入异步行为,同时向客户端提供同步接口。这是通过消息传递、请求队列和调度机制的组合来实现的。
关键部件
- proxy:代表客户端的公共接口。更简单地说,这就是客户端将要交互的内容。它将方法调用转换为对活动对象的请求。
- 调度器:管理请求队列并确定请求执行的顺序。
- servant:包含被调用方法的实际实现。这就是实际计算逻辑的所在。
- 激活队列:存储来自代理的请求,直到调度程序处理它们。
- future/callback:异步计算结果的占位符。
工作流程
- 客户端调用代理上的方法。
- 代理创建请求并将其放入激活队列中。
- 调度程序接收请求并将其转发给servant执行。
- 结果通过 future 对象返回给客户端。
使用案例
执行
假设我们需要进行计算,可能是 api 调用、数据库查询等。我不会实现任何异常处理,因为我太懒了。
def compute(x, y): time.sleep(2) # some time taking task return x + y
没有活动对象模式
下面是我们如何在不使用主动对象模式的情况下处理并发请求的示例。
import threadingimport timedef main(): # start threads directly results = {} def worker(task_id, x, y): results[task_id] = compute(x, y) print("submitting tasks...") thread1 = threading.thread(target=worker, args=(1, 5, 10)) thread2 = threading.thread(target=worker, args=(2, 15, 20)) thread1.start() thread2.start() print("doing other work...") thread1.join() thread2.join() # retrieve results print("result 1:", results[1]) print("result 2:", results[2])if __name__ == "__main__": main()
上述方法的缺点
使用主动对象模式实现
下面是主动对象模式的 python 实现,使用线程和队列来执行与上面相同的操作。我们将一一介绍每个部分:
methodrequest: 封装方法、参数和用于存储结果的 future。
class methodrequest: def __init__(self, method, args, kwargs, future): self.method = method self.args = args self.kwargs = kwargs self.future = future def execute(self): try: result = self.method(*self.args, **self.kwargs) self.future.set_result(result) except exception as e: self.future.set_exception(e)
调度程序:在单独的线程中持续处理来自activation_queue的请求。
import threadingimport queueclass scheduler(threading.thread): def __init__(self): super().__init__() self.activation_queue = queue.queue() self._stop_event = threading.event() def enqueue(self, request): self.activation_queue.put(request) def run(self): while not self._stop_event.is_set(): try: request = self.activation_queue.get(timeout=0.1) request.execute() except queue.empty: continue def stop(self): self._stop_event.set() self.join()
servant:实现实际逻辑(例如,计算方法)。
import timeclass servant: def compute(self, x, y): time.sleep(2) return x + y
proxy:将方法调用转换为请求并返回结果的 future。
from concurrent.futures import futureclass proxy: def __init__(self, servant, scheduler): self.servant = servant self.scheduler = scheduler def compute(self, x, y): future = future() request = methodrequest(self.servant.compute, (x, y), {}, future) self.scheduler.enqueue(request) return future
客户端:异步提交任务并在需要时检索结果。
def main(): # Initialize components scheduler = Scheduler() scheduler.start() servant = Servant() proxy = Proxy(servant, scheduler) # Client makes an asynchronous call print("Submitting tasks...") future1 = proxy.compute(5, 10) future2 = proxy.compute(15, 20) # Perform other tasks while computation is ongoing print("Doing other work...") # Retrieve results print("Result 1:", future1.result()) print("Result 2:", future2.result()) # Shutdown scheduler scheduler.stop()if __name__ == "__main__": main()
优点
缺点
结论
主动对象模式是用于管理多线程环境中的异步操作的强大工具。通过将方法调用与执行分离,它可以确保更好的响应能力、可扩展性和更干净的代码库。虽然它具有一定的复杂性和潜在的性能开销,但它的好处使其成为需要高并发和可预测执行的场景的绝佳选择。然而,它的使用取决于当前的具体问题。与大多数模式和算法一样,不存在一刀切的解决方案。
参考
维基百科 - 活动对象