PHP前端开发

Python开发之多个定时任务在单线程下执行的实例分析

百变鹏仔 3小时前 #Python
文章标签 多个

单线程多定时任务 

1、初始版本:

思路:定时器,说白了就是延时执行指定的程序,目前自己重构python里面的定时器不太现实,能力达不到,所以延时操作时还得用到系统定时器,不过我们可以改一下规则;把所有要进行定时操作的程序添加到特定列表中,把列表中定时时间最短程序拿出来,进行threading.Timer(time,callback)绑定,等时间超时触发自定义的callback,执行刚刚列表取出的程序;然后把时间更新,再次把列表中时间最短的程序拿出了,继续threading.Timer绑定,不断的迭代循环;当有新的定时任务加入到列表时,把当前的threading.Timer绑定取消,更新列表中的时间,再次取出最短时间,进行threading.Timer绑定......

代码:


import threadingimport timeclass Timer():    '''单线程下的定时器'''    def __init__(self):        self.queues = []        self.timer = None        self.last_time = time.time()    def start(self):        item = self.get()        if item:            self.timer = threading.Timer(item[0],self.execute)            self.timer.start()    def add(self,item):        print('add',item)        self.flush_time()        self.queues.append(item)        self.queues.sort(key=lambda x:x[0])        if self.timer:            self.timer.cancel()            self.timer = None        self.start()    def get(self):        item = None        if len(self.queues) > 0:            item = self.queues[0]        return item    def pop(self):        item = None        if len(self.queues) > 0:            item = self.queues.pop(0)        return item    def flush_time(self):        curr_time = time.time()        for i in self.queues:            i[0] = i[0] - (curr_time - self.last_time)        self.last_time = curr_time    def execute(self):        # if self.timer:        #     self.timer.cancel()        #     self.timer = None        item = self.pop()        self.flush_time()        if item:            callback = item[1]            args = item[0]            callback(args)        self.start()

执行及输出:


if __name__ == '__main__':    # 检测线程数    def func():        while True:            print(threading.active_count())            time.sleep(1)        f1 = threading.Thread(target=func)    f1.start()        import logging    logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")    def func1(*args):        logging.info('func1 %s'%args)        # time.sleep(5)        def func2(*args):        logging.info('func2 %s' % args)        # time.sleep(5)    def func3(*args):        logging.info('func3 %s' % args)        # time.sleep(5)        def func4(*args):        logging.info('func4 %s' % args)        # time.sleep(5)        def func5(*args):        logging.info('func5 %s' % args)        # time.sleep(5)            # 测试    t1 = Timer()    logging.info('start')    t1.add([5,func1])    time.sleep(0.5)    t1.add([4,func2])    time.sleep(0.5)    t1.add([3,func3])    time.sleep(0.5)    t1.add([2,func4])    time.sleep(0.5)    t1.add([1,func5])    time.sleep(5)    t1.add([1,func1])    t1.add([2,func2])    t1.add([3,func3])    t1.add([4,func4])    t1.add([5,func5])        # 输出    # 2    # 07/27/2017 10:36:47 [Thursday] start    # add [5, <function>]    # add [4, <function>]    # 3    # add [3, <function>]    # add [2, <function>]    # 3    # add [1, <function>]    # 3    # 07/27/2017 10:36:50 [Thursday] func5 1    # 07/27/2017 10:36:51 [Thursday] func4 0.498349666595459    # 3    # 07/27/2017 10:36:51 [Thursday] func3 0.49782633781433105    # 07/27/2017 10:36:52 [Thursday] func2 0.49848270416259766    # 3    # 07/27/2017 10:36:52 [Thursday] func1 0.48449039459228516    # 2    # 2    # add [1, <function>]    # add [2, <function>]    # add [3, <function>]    # add [4, <function>]    # add [5, <function>]    # 3    # 07/27/2017 10:36:55 [Thursday] func1 0.9990766048431396    # 3    # 07/27/2017 10:36:56 [Thursday] func2 0.9988017082214355    # 3    # 07/27/2017 10:36:57 [Thursday] func3 0.99928879737854    # 07/27/2017 10:36:58 [Thursday] func4 0.9991350173950195    # 3    # 3    # 07/27/2017 10:36:59 [Thursday] func5 0.9988160133361816</function></function></function></function></function></function></function></function></function></function>

执行代码

立即学习“Python免费学习笔记(深入)”;

注:查看代码输出,所有的定时器都按照标定的时间依次执行,非常完美,一切看起来很美好,只是看起来,呵呵哒,当你把func里面的time.sleep(5)启用后,线程数蹭蹭的上来了;原因是上个定时器callback还是执行中,下个定时器已经启动了,这时就又新增了一个线程,哎,失败

 

2、修订版本

思路:利用生成者消费者模型,用到threading.Condition条件变量;强制永远启用的是一个Timer!

代码:


import timeimport threadingimport loggingclass NewTimer(threading.Thread):    '''单线程下的定时器'''    def __init__(self):        super().__init__()        self.queues = []        self.timer = None        self.cond = threading.Condition()    def run(self):        while True:            # print('NewTimer',self.queues)            self.cond.acquire()            item = self.get()            callback = None            if not item:                logging.info('NewTimer wait')                self.cond.wait()            elif item[0]  0:            item = self.queues.pop(0)        return item    def get(self):        item = None        if len(self.queues) &gt; 0:            item = self.queues[0]        return item    def execute(self):        logging.info('NewTimer execute notify')        self.cond.acquire()        self.cond.notify()        self.cond.release()

执行及输出:


if __name__ == '__main__':    def func():        while True:            print(threading.active_count())            time.sleep(1)    f1 = threading.Thread(target=func)    f1.start()    logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")    newtimer = NewTimer()    newtimer.start()    def func1(*args):        logging.info('func1 %s'%args)        time.sleep(5)    def func2(*args):        logging.info('func2 %s' % args)        time.sleep(5)    def func3(*args):        logging.info('func3 %s' % args)        time.sleep(5)    def func4(*args):        logging.info('func4 %s' % args)        time.sleep(5)    def func5(*args):        logging.info('func5 %s' % args)        time.sleep(5)    newtimer.add([5,func1])    newtimer.add([4,func2])    newtimer.add([3,func3])    newtimer.add([2,func4])    newtimer.add([1,func5])    time.sleep(1)    newtimer.add([1,func1])    newtimer.add([2,func2])    newtimer.add([3,func3])    newtimer.add([4,func4])    newtimer.add([5,func5])# 输出# 2# 07/27/2017 11:26:19 [Thursday] NewTimer wait# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer start sys timer and wait# 07/27/2017 11:26:20 [Thursday] NewTimer execute notify# 4# 07/27/2017 11:26:20 [Thursday] func5 1501125980.2175007# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 3# 3# 3# 3# 3# 07/27/2017 11:26:25 [Thursday] func4 1501125981.2175007# 3# 3# 3# 3# 07/27/2017 11:26:30 [Thursday] func1 1501125981.218279# 3# 3# 3# 3# 3# 3# 07/27/2017 11:26:35 [Thursday] func3 1501125982.2175007# 3# 3# 3# 3# 07/27/2017 11:26:40 [Thursday] func2 1501125982.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:45 [Thursday] func2 1501125983.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:26:50 [Thursday] func3 1501125983.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:55 [Thursday] func1 1501125984.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:27:00 [Thursday] func4 1501125984.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:05 [Thursday] func5 1501125985.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:10 [Thursday] NewTimer wait

输出

注:这次无论如何测试线程数也不会蹭蹭的上涨,同时可以实现多定时器任务要求;缺点:用到了两线程,没有用到单线程去实现,第二时间精准度问题,需要等待上个定时程序执行完毕,程序才能继续运行