PHP前端开发

上下文管理器和守护线程

百变鹏仔 1天前 #Python
文章标签 上下文
问题内容

我正在从上下文管理器启动一个守护线程,该线程应该每秒发送一次心跳,但由于它在线程中运行,因此如果发生异常,它不会终止上下文管理器。当心跳停止时,如何在上下文管理器中引发异常?

from contextlib import contextmanagerfrom threading import Thread, Eventfrom time import sleep@contextmanagerdef plc():    stop_event = Event()    try:        # Send heartbeat every second        hb_t = Thread(target=heartbeat_task,                      args=(stop_event,),                      daemon=True)        hb_t.start()        yield    except Exception:        raise    finally:        stop_event.set()        hb_t.join()        print("Heartbeat stopped")def heartbeat_task(stop_event):    value = False        while not stop_event.is_set():        value = not value        print("Heartbeat: " + str(value))        sleep(1)def main():    with plc():        while True:            print("Program running")            sleep(5)if __name__ == '__main__':    main()

我很难找到这方面的例子。

感谢您的帮助!


正确答案


更新

我已经修改了代码,使其与您发布的代码更加一致。但是:

您提供的代码不一致:heartbeat_task 传递了一个事件,如果设置该事件将导致函数返回。但只有当使用 with plc(): 创建的函数 main 中的上下文管理器退出时才会设置它,而这是永远不会的。如果您希望 heartbeat_task 抛出的任何异常将强制上下文管理器退出,然后在函数 plc 中捕获,那么调用 stop_event.set() 的意义何在?如果根据定义,我们仅在 heartbeat_task 不再存在时才到达这里由于异常而运行?

因此,要么您希望 heartbeat_task 无限期地运行,直到引发异常(在这种情况下,没有“停止”事件的意义),要么您希望能够在存在某些条件时停止 heartbeat_task,但没有这样做的代码。出于演示目的,我假设 main 将有权访问 stop_event 事件,并在某些情况下对其进行设置。否则,它会一直运行,直到检测到 heartbeat_task 不再运行,可能是因为它引发了异常(它正在执行无限循环,所以如果尚未设置停止事件,它怎么可能终止?)。剩下的就是为什么您需要使用上下文管理器。稍后我将提出一个替代方案。

如果您使用多线程池(我们只需要池中的一个线程),那么主线程捕获提交到池的任务抛出的异常就变得很简单:当 multiprocessing.pool.threadpool.apply_async 被调用时返回 multiprocessing.pool.asyncresult 实例,表示未来的完成。当在此实例上调用 get 方法时,您可以从辅助函数 (heartbeat_task) 获取返回值,或者重新引发辅助函数引发的任何异常。但是我们也可以使用 wait 方法来等待提交任务的完成或经过的时间。然后我们可以使用 ready 方法测试等待 5 秒后提交的任务是否真正完成(由于异常或返回)。如果任务仍在运行,那么我们可以告诉它停止。在此演示中,我强制任务在大约 7 秒后引发异常:

from contextlib import contextmanagerfrom threading import eventfrom multiprocessing.pool import threadpoolfrom time import sleep@contextmanagerdef plc():    stop_event = event()    pool = threadpool(1)    # send heartbeat every second    async_result = pool.apply_async(heartbeat_task, args=(stop_event,))    yield stop_event, async_result    # we only return here if the task is no longer running    try:        # see if task threw an exception and if so, catch it:        async_result.get()    except exception as e:        print("got exception:", e)    finally:        pool.close()        pool.join()        print("heartbeat stopped")def heartbeat_task(stop_event):    # for demo purposes, we will force an exception to occur    # after approximately 7 seconds:    value = false    n = 0    while not stop_event.is_set():        value = not value        print("heartbeat: " + str(value))        sleep(1)        n += 1        if n == 7:            raise exception('oops!')def main():    with plc() as tpl:        stop_event, async_result = tpl        # this function could forcibly cause the heartbeat_task        # to complete by calling stop_event.set()        # loop while the task is still running        while not async_result.ready():            """            if some_condition:                stop_event.set()                break            """            print("program running")            # sleep for 5 seconds or until heartbeat_task terminates:            async_result.wait(5)if __name__ == '__main__':    main()

打印:

program runningheartbeat: trueheartbeat: falseheartbeat: trueheartbeat: falseheartbeat: trueprogram runningheartbeat: falseheartbeat: truegot exception: oops!heartbeat stopped

使用上下文管理器的替代方法

from threading import Eventfrom multiprocessing.pool import ThreadPoolfrom time import sleepdef heartbeat_task(stop_event):    value = False    n = 0    while not stop_event.is_set():        value = not value        print("Heartbeat: " + str(value))        sleep(1)        n += 1        if n == 7:            raise Exception('Oops!')def main():    stop_event = Event()    pool = ThreadPool(1)    async_result = pool.apply_async(heartbeat_task, args=(stop_event,))    # Run as long as heartbeat_task is running:    while not async_result.ready():        """        if some_condition:            stop_event.set()            break        """        print("Program running")        # Sleep for 5 seconds or until heartbeat_task terminates:        async_result.wait(5)    # Any exception thrown in heartbeat_task will be rethrown and caught here:    try:        async_result.get()    except Exception as e:        print("Got exception:", e)    finally:        pool.close()        pool.join()if __name__ == '__main__':    main()