Source code for cup.services.executor

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
    1. Delay-execute sth after several seconds

    2. Schedule some tasks in a queue.
"""
from __future__ import print_function
import abc
import pdb
try:
    # pylint:disable=F0401
    import queue
except ImportError:
    import Queue as queue
import copy
import time
import calendar
import datetime
import threading
import traceback

import pytz
from cup.util import threadpool
from cup import log
from cup.services import generator

URGENCY_HIGH = 0
URGENCY_NORMAL = 1
URGENCY_LOW = 2


[docs]class AbstractExecution(object): """ abstract execution service """ __metaclass__ = abc.ABCMeta def __init__(self, delay_exe_thdnum, queue_exec_thdnum, name): """ init """ self._toal_thdnum = delay_exe_thdnum + queue_exec_thdnum self._delay_exe_thdnu = delay_exe_thdnum self._queue_exe_thdnum = queue_exec_thdnum self._delay_queue = queue.PriorityQueue() self._exec_queue = queue.PriorityQueue() self._thdpool = threadpool.ThreadPool( self._toal_thdnum, self._toal_thdnum, name='executor_pool' ) self._status = 0 # 0 inited, 1 running 2 stopping log.info( 'Executor service inited, delay_exec thread num:%d,' ' exec thread num:%d' % (delay_exe_thdnum, queue_exec_thdnum) ) self._name = '' if name is None else name
[docs] @abc.abstractmethod def exec_worker(self, check_interval, func_queue, worker_name): """exec worker"""
[docs] @abc.abstractmethod def delay_exec(self, delay_time_insec, function, urgency, *args, **kwargs ): """delay exec for the abstract"""
[docs]class ExecutionService(AbstractExecution): """ execution service """ def __init__( self, delay_exe_thdnum=3, queue_exec_thdnum=4, name=None ): AbstractExecution.__init__( self, delay_exe_thdnum, queue_exec_thdnum, name ) def _do_delay_exe(self, task_data): self._delay_queue.put(task_data)
[docs] def delay_exec(self, delay_time_insec, function, urgency, *args, **kwargs ): """ delay_execute function after delay_time seconds You can use urgency := executor.URGENCY_NORMAL, by default :TODO: consider about stopping timers when invoking stop function """ log.debug('got delay exec, func:{0}'.format(function)) task_data = (urgency, (function, args, kwargs)) timer = threading.Timer( delay_time_insec, self._do_delay_exe, [task_data] ) timer.start()
[docs] def queue_exec(self, function, urgency, *argvs, **kwargs): """ execute function in a queue. Functions will be queued in line to be scheduled. You can use urgency := executor.URGENCY_NORMAL, by default. """ task_data = (urgency, (function, argvs, kwargs)) self._exec_queue.put(task_data)
[docs] def exec_worker(self, check_interval, func_queue, worker_name=''): while self._status != 2: try: item = func_queue.get(timeout=check_interval) except queue.Empty: continue function = None argvs = None kwargs = None try: _, (function, argvs, kwargs) = item # pylint: disable=W0142 if func_queue is self._delay_queue: log.debug('to delay exec func:{0}'.format(function)) function(*argvs, **kwargs) # pylint: disable=W0703 # we can NOT predict the exception type except Exception as error: log.warn( '{0} worker encountered exception:{1}, func:{2},' 'args:{3} {4} , executor service({5})'.format( worker_name, error, function, argvs, kwargs, self._name) ) log.warn('error type:{0}'.format(type(error))) log.debug( '{0} worker thread exited as the service ' 'is stopping'.format(worker_name) )
[docs] def run(self): """ Delayexec worker checks task every 20ms QueueExec worker checks task every 100ms """ self._thdpool.start() self._status = 1 for _ in range(0, self._delay_exe_thdnu): self._thdpool.add_1job( self.exec_worker, 0.1, self._delay_queue, 'Delayexec' ) for _ in range(0, self._queue_exe_thdnum): self._thdpool.add_1job( self.exec_worker, 0.02, self._exec_queue, 'Exec' ) log.info('Executor service {0} started'.format(self._name))
[docs] def start(self): """alias for self.run""" return self.run()
[docs] def stop(self, wait_workerstop=True): """ stop the executor service. :wait_workerstop: If wait_workerstop is True, the function will hang util all workers finish thier tasks. Otherwise, the function will not hang, but tell you whether it's succeeded stopped. (True for stoped, False for not stopped yet) """ log.info('to stop executor {0}'.format(self._name)) self._status = 2 if wait_workerstop: self._thdpool.stop() else: self._thdpool.try_stop() log.info('end stopping executor {0}'.format(self._name))
[docs]class CronTask(object): """ crontask for CronExecution Typical exmaples for timer_dict: 1. one specific time Jan 1st, 2020 18:01 :: timer_dict { 'minute': [1], 'hour': [18], 'weekday': None, 'monthday': [1], 'month': [1] } 2. every minute { :: timer_dict { 'minute': [0, 1, 2, ....59], 'hour': None, 'weekday': None, 'monthday': None, 'month': None } 3. every hour :: timer_dict { 'minute': [0], 'hour': None, 'weekday': None, 'monthday': None, 'month': None } 4. every 20:00 PM :: timer_dict { 'minute': [0], 'hour': [20], 'weekday': None, 'monthday': None, 'month': None } 5. every 20:00 PM at workdays (Monday to Friday) :: timer_dict { 'minute': [0], 'hour': [20], 'weekday': [1, 2, 3, 4, 5], 'monthday': None, 'month': None } 6. every 20:00 PM at workdays for Jan and July :: timer_dict { 'minute': [0], 'hour': [20], 'weekday': [1, 2, 3, 4, 5], 'monthday': None, 'month': [1, 7] } 7. every 20:00 PM at 1st, 3rd of Jan and July :: timer_dict { 'minute': [0], 'hour': [20], 'weekday': None, 'monthday': [1, 3], 'month': [1, 7] } """ _CHECK_ORDER = ['month', 'monthday', 'weekday', 'hour', 'minute'] _NONE_FILLING = { 'month': range(1, 13), 'monthday': range(1, 32), 'weekday': range(1, 8), 'hour': range(24), 'minute': range(60) } _GEN = generator.CachedUUID() def __init__( self, name, pytz_timezone, timer_dict, md5uuid, function, *args, **kwargs ): """ :param pytz_timezone: which can be initialized like: tz = pytz.timezone('Asia/Beijing') :param timer_dict: { 'minute': minute_list, 'hour': hour_list, 'weekday': weekday_list, # [1~7] 'monthday': monday_list, # [1~31] 'month': month_list, # [1~12, 1~12] } # None stands for all valid, don't consider this field :param function: function that to be scheduled :param args: args of function :param kwargs: key args of function :raise: ValueError if function is not callable """ if not callable(function): raise ValueError('param function should be callable') if not isinstance(pytz_timezone, pytz.BaseTzInfo): raise ValueError('not a valid pytz timezone') self._name = name self._funcargs = (function, args, kwargs) self._pytz = pytz_timezone self._timer_dict = timer_dict if not all([ 'minute' in timer_dict, 'hour' in timer_dict, 'weekday' in timer_dict, 'monthday' in timer_dict, 'month' in timer_dict ]): raise ValueError('keys ' '(minute hour weekday monthday month should be in dict)' ) self._timer_params = self._generate_timer_params(self._timer_dict) self._check_param_valids(self._timer_params) self._lastsched_time = None if md5uuid is None: self._md5_id = self._GEN.get_uuid()[0] else: self._md5_id = md5uuid self._timer = None
[docs] def get_funcargs(self): """return (function, args, kwargs)""" return self._funcargs
[docs] def name(self): """return name of the crontask""" return self._name
[docs] def taskid(self): """get 32 byte taskid""" return self._md5_id
[docs] def last_schedtime(self): """return last schedtime""" return self._lastsched_time
def _generate_timer_params(self, timer_dict): """generate timer params""" tmp_timer_dict = {} for check_key in self._CHECK_ORDER: valid_items = timer_dict[check_key] if valid_items is None: tmp_timer_dict[check_key] = self._NONE_FILLING[check_key] else: tmp_timer_dict[check_key] = sorted(valid_items) return tmp_timer_dict
[docs] def pytz_timezone(self): """return pytz timezone""" return self._pytz
[docs] def set_last_schedtime(self, datetime_obj): """set_last_schedtime. :param timestamp: E.g. set timestamp to time.time() for "now" """ if not isinstance(datetime_obj, datetime.datetime): raise ValueError('datetime_obj should be a datetime.datetime obj') self._lastsched_time = datetime_obj
[docs] def get_last_schedtime(self): """get last sched time, return with a datetime.datetime object. Plz notice the timezone is enabled """ return self._lastsched_time
@classmethod def _check_param_valids(cls, timer_params): """ check if params r valid :raise: ValueError if not valid """ for check_key in timer_params: if len(timer_params[check_key]) < 1: raise ValueError('{0} less than 1 element'.format(check_key))
[docs] @classmethod def next_month(cls, tmp_dict, timer_params): """ set tmp_dict to next valid date, specifically month :param tmp_dict: :: { 'year': xxxx, 'month': xxxx, 'monthday': xxxx, 'weekday': xxxx, 'hour': xxxx, 'minute': xxxx } :param timer_params: valid timer dict, same to self._timer_params """ while True: tmp_dict['month'] += 1 if tmp_dict['month'] in timer_params['month']: break else: if tmp_dict['month'] > 12: tmp_dict['month'] = timer_params['month'][0] tmp_dict['year'] += 1 break monthday = [x for x in range( 1, calendar.monthrange(tmp_dict['year'], tmp_dict['month'])[1] + 1 ) \ if x in timer_params['monthday'] ] tmp_dict['monthday'] = monthday[0] tmp_dict['hour'] = timer_params['hour'][0] tmp_dict['minute'] = timer_params['minute'][0] tmp_dict['weekday'] = calendar.weekday( tmp_dict['year'], tmp_dict['month'], tmp_dict['monthday'] ) + 1 timer_params['monthday'] = monthday
[docs] @classmethod def check_monthday_weekday(cls, tmp_dict, timer_params): """check if monthday / weekday valid""" try: day = calendar.weekday( tmp_dict['year'], tmp_dict['month'], tmp_dict['monthday'] ) + 1 # e.g. invalid date, 4.31 except ValueError: return False if day in timer_params['weekday']: return True else: return False
[docs] @classmethod def next_monthday_weekday(cls, tmp_dict, timer_params): """ set next monthday && weekday """ plus = 1 while True: tmp_dict['monthday'] += plus if plus == 0: plus = 1 if all([ tmp_dict['monthday'] in timer_params['monthday'], cls.check_monthday_weekday(tmp_dict, timer_params) ]): tmp_dict['hour'] = timer_params['hour'][0] tmp_dict['minute'] = timer_params['hour'][0] break else: if tmp_dict['monthday'] > 31: cls.next_month(tmp_dict, timer_params) plus = 0
[docs] @classmethod def next_hour(cls, tmp_dict, timer_params): """ :return: { 'year': xxx, 'month': xxx, 'monthday': """ plus = 1 while True: tmp_dict['hour'] += plus if plus == 0: plus = 1 if tmp_dict['hour'] in timer_params['hour']: tmp_dict['minute'] = timer_params['minute'][0] break else: if tmp_dict['hour'] > 23: cls.next_monthday_weekday(tmp_dict, timer_params) plus = 0
[docs] @classmethod def next_minute(cls, tmp_dict, timer_params): """ :return: { 'year': xxx, 'month': xxx, 'monthday': """ plus = 1 while True: tmp_dict['minute'] += plus if plus == 0: plus = 1 if tmp_dict['minute'] in timer_params['minute']: break else: if tmp_dict['minute'] > 59: cls.next_hour(tmp_dict, timer_params) plus = 0
[docs] def next_schedtime(self, starting_fromdate=None): """ return next schedule time with timezone enabled. """ if starting_fromdate is None: tmp = datetime.datetime.now() datenow = self._pytz.localize(tmp) else: datenow = starting_fromdate tmp_dict = { 'year': datenow.year, 'month': datenow.month, 'monthday': datenow.day, 'weekday': datenow.isoweekday(), 'hour': datenow.hour, 'minute': datenow.minute + 1 } timer_params = copy.deepcopy(self._timer_params) maxtimes = 365 * 24 * 60 while True: if tmp_dict['month'] in timer_params['month']: if self.check_monthday_weekday( tmp_dict, timer_params ): if tmp_dict['hour'] in timer_params['hour']: if tmp_dict['minute'] in timer_params['minute']: break else: self.next_minute(tmp_dict, self._timer_params) maxtimes -= 1 if maxtimes < 0: log.warn( 'No valid datetime in a year' 'for crontask {0}'.format(self) ) return None else: self.next_hour(tmp_dict, self._timer_params) else: self.next_monthday_weekday(tmp_dict, self._timer_params) else: self.next_month(tmp_dict, timer_params) local_dt = self._pytz.localize(datetime.datetime( year=tmp_dict['year'], month=tmp_dict['month'], day=tmp_dict['monthday'], hour=tmp_dict['hour'], minute=tmp_dict['minute'] )) self.set_last_schedtime(local_dt) return local_dt
def __repr__(self): """return info of the crontask""" return 'CronTask(ID:{0} Name:{1})'.format(self.taskid(), self._name)
[docs] def set_timer(self, timer): """set timer to this crontask""" self._timer = timer
[docs] def get_timer(self): """return timer of the crontask""" return self._timer
[docs]class CronExecution(ExecutionService): """ run execution like cron. Plz notice the following circumstances: - if the previous task is still running and the scheduing time comes, executor will wait until the previous task finishes """ def __init__(self, threads_num=3, name=None): """ :param threads_num: startup threads num :param max_threadsnum: max threads num """ self._task_dict = {} ExecutionService.__init__( self, delay_exe_thdnum=threads_num, queue_exec_thdnum=0, name=name )
[docs] def get_tasks(self): """get all cron execution tasks""" return self._task_dict.items()
[docs] def get_taskbyid(self, md5uuid): """ :param md5uuid: md5sum of uuid """ return self._task_dict.get(md5uuid, None)
[docs] def exec_worker(self, check_interval, func_queue, worker_name=''): log.info('CronExecution exec worker started') while self._status != 2: try: item = func_queue.get(timeout=check_interval) except queue.Empty: continue function = None argvs = None kwargs = None try: _, crontask, (function, argvs, kwargs) = item # pylint: disable=W0142 if func_queue is self._delay_queue: log.debug('to delay exec func:{0}'.format(function)) dtnow = datetime.datetime.now(crontask.pytz_timezone()) if (dtnow - crontask.get_last_schedtime()).total_seconds() > 60: log.warn( 'lagging crontask found (name:{0} id: {1})'.format( crontask.name(), crontask.taskid() ) ) function(*argvs, **kwargs) self.schedule(crontask) # pylint: disable=W0703 # we can NOT predict the exception type except Exception as error: log.warn( '{0} worker encountered exception:{1}, func:{2},' 'args:{3} {4} , executor service({5})'.format( worker_name, error, function, argvs, kwargs, self._name) ) log.warn('error type:{0}'.format(type(error))) log.debug( '{0} worker thread exited as the service ' 'is stopping'.format(worker_name) )
[docs] def delay_exec(self, delay_time_insec, crontask, function, urgency, *args, **kwargs ): """ delay_execute function after delay_time seconds You can use urgency := executor.URGENCY_NORMAL, by default :TODO: consider about stopping timers when invoking stop function """ log.info( 'CronExecution {0} got delay exec, func:{1}'.format( self._name, function) ) task_data = (urgency, crontask, (function, args, kwargs)) timer = threading.Timer( delay_time_insec, self._do_delay_exe, [task_data] ) crontask.set_timer(timer) timer.start()
[docs] def schedule(self, crontask): """schedule the crontask :param timer_dict: :: { 'minute': minute_list, 'hour': hour_list, 'weekday': weekday_list, 'monthday': monday_list, 'month': month_list } :param function: function that to be scheduled :param args: args of function :param kwargs: key args of function """ next_schedtime = crontask.next_schedtime() if next_schedtime is None: log.warn( 'CronExecution:crontask {0} will be deleted ' 'from the crontask as ' 'no valid schedule time is found'.format(crontask) ) function, args, kwargs = crontask.get_funcargs() tmpnow = crontask.pytz_timezone().localize(datetime.datetime.now()) wait_seoncds = (next_schedtime - tmpnow).total_seconds() log.info( 'CronExecution: next schedule time for this crontask is {0} ' 'timezone {1}, wait for {2} seconds, timenwo is {3}'.format( next_schedtime, next_schedtime.tzinfo, wait_seoncds, next_schedtime.tzinfo.localize(datetime.datetime.now()) ) ) # pylint: disable=W0142 self.delay_exec( wait_seoncds, crontask, function, URGENCY_NORMAL, *args, **kwargs ) self._task_dict[crontask.taskid()] = crontask
[docs] def calcel_delay_exec(self, taskid): """calcel delayexec by taskid""" task = self._task_dict.get(taskid, None) if task is None: log.warn('delay exec task id {0} not found'.format(taskid)) return timer = task.get_timer() timer.cancel()
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent