上下文管理器和守护线程
我正在从上下文管理器启动一个守护线程,该线程应该每秒发送一次心跳,但由于它在线程中运行,因此如果发生异常,它不会终止上下文管理器。当心跳停止时,如何在上下文管理器中引发异常?
from contextlib import contextmanagerfrom threading import Thread, Eventfrom time import sleep@contextmanagerdef plc(): stop_event = Event() try: # Send heartbeat every second hb_t = Thread(target=heartbeat_task, args=(stop_event,), daemon=True) hb_t.start() yield except Exception: raise finally: stop_event.set() hb_t.join() print("Heartbeat stopped")def heartbeat_task(stop_event): value = False while not stop_event.is_set(): value = not value print("Heartbeat: " + str(value)) sleep(1)def main(): with plc(): while True: print("Program running") sleep(5)if __name__ == '__main__': main()
我很难找到这方面的例子。
感谢您的帮助!
正确答案
更新
我已经修改了代码,使其与您发布的代码更加一致。但是:
您提供的代码不一致:heartbeat_task 传递了一个事件,如果设置该事件将导致函数返回。但只有当使用 with plc(): 创建的函数 main 中的上下文管理器退出时才会设置它,而这是永远不会的。如果您希望 heartbeat_task 抛出的任何异常将强制上下文管理器退出,然后在函数 plc 中捕获,那么调用 stop_event.set() 的意义何在?如果根据定义,我们仅在 heartbeat_task 不再存在时才到达这里由于异常而运行?
因此,要么您希望 heartbeat_task 无限期地运行,直到引发异常(在这种情况下,没有“停止”事件的意义),要么您希望能够在存在某些条件时停止 heartbeat_task,但没有这样做的代码。出于演示目的,我假设 main 将有权访问 stop_event 事件,并在某些情况下对其进行设置。否则,它会一直运行,直到检测到 heartbeat_task 不再运行,可能是因为它引发了异常(它正在执行无限循环,所以如果尚未设置停止事件,它怎么可能终止?)。剩下的就是为什么您需要使用上下文管理器。稍后我将提出一个替代方案。
如果您使用多线程池(我们只需要池中的一个线程),那么主线程捕获提交到池的任务抛出的异常就变得很简单:当 multiprocessing.pool.threadpool.apply_async 被调用时返回 multiprocessing.pool.asyncresult 实例,表示未来的完成。当在此实例上调用 get 方法时,您可以从辅助函数 (heartbeat_task) 获取返回值,或者重新引发辅助函数引发的任何异常。但是我们也可以使用 wait 方法来等待提交任务的完成或经过的时间。然后我们可以使用 ready 方法测试等待 5 秒后提交的任务是否真正完成(由于异常或返回)。如果任务仍在运行,那么我们可以告诉它停止。在此演示中,我强制任务在大约 7 秒后引发异常:
from contextlib import contextmanagerfrom threading import eventfrom multiprocessing.pool import threadpoolfrom time import sleep@contextmanagerdef plc(): stop_event = event() pool = threadpool(1) # send heartbeat every second async_result = pool.apply_async(heartbeat_task, args=(stop_event,)) yield stop_event, async_result # we only return here if the task is no longer running try: # see if task threw an exception and if so, catch it: async_result.get() except exception as e: print("got exception:", e) finally: pool.close() pool.join() print("heartbeat stopped")def heartbeat_task(stop_event): # for demo purposes, we will force an exception to occur # after approximately 7 seconds: value = false n = 0 while not stop_event.is_set(): value = not value print("heartbeat: " + str(value)) sleep(1) n += 1 if n == 7: raise exception('oops!')def main(): with plc() as tpl: stop_event, async_result = tpl # this function could forcibly cause the heartbeat_task # to complete by calling stop_event.set() # loop while the task is still running while not async_result.ready(): """ if some_condition: stop_event.set() break """ print("program running") # sleep for 5 seconds or until heartbeat_task terminates: async_result.wait(5)if __name__ == '__main__': main()
打印:
program runningheartbeat: trueheartbeat: falseheartbeat: trueheartbeat: falseheartbeat: trueprogram runningheartbeat: falseheartbeat: truegot exception: oops!heartbeat stopped
使用上下文管理器的替代方法
from threading import Eventfrom multiprocessing.pool import ThreadPoolfrom time import sleepdef heartbeat_task(stop_event): value = False n = 0 while not stop_event.is_set(): value = not value print("Heartbeat: " + str(value)) sleep(1) n += 1 if n == 7: raise Exception('Oops!')def main(): stop_event = Event() pool = ThreadPool(1) async_result = pool.apply_async(heartbeat_task, args=(stop_event,)) # Run as long as heartbeat_task is running: while not async_result.ready(): """ if some_condition: stop_event.set() break """ print("Program running") # Sleep for 5 seconds or until heartbeat_task terminates: async_result.wait(5) # Any exception thrown in heartbeat_task will be rethrown and caught here: try: async_result.get() except Exception as e: print("Got exception:", e) finally: pool.close() pool.join()if __name__ == '__main__': main()