cup.services package

@author Guannan Ma @mythmgn

Submodules

cup.services.autowait module

description:

auto wait related modules.

cup.services.autowait.wait_until_file_exist(dst_path, file_name, max_wait_sec=10, interval_sec=2, recursive=False)[source]

wait util the file exists or the function timeout

Parameters:
  • dst_path – searching path

  • file_name – filename, support *

  • max_wait_sec – max wating time until timeout

  • interval_sec – check interval

  • recursive – recursively search or not

Returns:

True if found.

cup.services.autowait.wait_until_port_not_used(port, max_wait_sec=10, interval_sec=0.5)[source]

wait until the port is free

Returns:

return True if the port is free before timeout

cup.services.autowait.wait_until_port_used(port, max_wait_sec=10, interval_sec=0.5)[source]

wait until the port is used. Notice this function will invoke a bash shell to execute command [netstat]!

Returns:

return True if the port is used

cup.services.autowait.wait_until_process_exist(process_path, max_wait_sec=10, interval_sec=0.5)[source]

wait until the process exists

Parameters:
  • process_path – the specific process working path

  • max_wait_sec – maximum waiting time until timeout

  • interval_sec – state check interval

Returns:

return True if the process is found before timeout

cup.services.autowait.wait_until_process_killed(process_path, ports, max_wait_sec=10, interval_sec=0.5)[source]

wait until the [process] does not exists and all [ports] are free

Parameters:
  • process_path – process cwd

  • ports – port list

  • interval_sec – state check interval

Returns:

True if all conditions meet.

cup.services.autowait.wait_until_process_not_exist(process_path, max_wait_sec=10, interval_sec=0.5)[source]

wait until the process does not exist anymore or the function timeouts

Parameters:
  • process_path – process cwd

  • max_wait_sec – maximum waiting time until timeout. 10 seconds by default

  • interval_sec – state check interval, 0.5 second by default

Returns:

return True if the process disapper before timeout

cup.services.autowait.wait_until_process_used_ports(process_path, ports, max_wait_sec=10, interval_sec=0.5)[source]

wait until the process has taken the ports before timeouts

Returns:

True if all ports are used by the specific process. False, otherwise

cup.services.autowait.wait_until_reg_str_exist(dst_file_path, reg_str, max_wait_sec=10, interval_sec=0.5)[source]

wait until any line in the file matches the reg_str(regular expression string)

Parameters:
  • dst_file_path – searching path

  • reg_str – regular expression string

  • max_wait_sec – maximum waiting time until timeout

  • interval_sec – state check interval

Returns:

True if found

cup.services.buffers module

description:

buffer pool

class cup.services.buffers.Buffer(items, block_size, uniqid)[source]

Bases: object

Buffer object which you get from BufferPool.allocate(num).

A Buffer consists of several bytearrays which is mutable compared to a normal str. In other words, if you have senarios like: allocte mem and deallocte mem frenquently. With high chance you can replace directly using str by Buffer. It will reduce the memory fragments.

get()[source]

return (True, (blocks, block_size, total_length, encoding)) if succeed

Otherwise, return (False, err_msg, None)

get_byte_arrays()[source]

get byte arrays in the buffer

get_uniq_id()[source]

return the uniqid for this object

length()[source]

return the length you have used for the buffer

maxsize()[source]

return how many unicode/str you can set to the buffer

set(content, encoding='utf8')[source]

set content to buffers

Returns:

return (True, None) if succeed. return (False, error_msg) otherwise

class cup.services.buffers.BufferPool(pool_size, block_size=135168, extendable=False)[source]

Bases: object

Buffer pool class which will ease memory fragment.

allocate(num)[source]

acclocate buff with num * block_size

Returns:

(True, Buffer object)

(False, str_error_msg)

deallocate(buff)[source]

return the acclocated buff back to the pool

cup.services.executor module

description:
  1. Delay-execute sth after several seconds

  2. Schedule some tasks in a queue.

class cup.services.executor.AbstractExecution(delay_exe_thdnum, queue_exec_thdnum, name)[source]

Bases: object

abstract execution service

abstract delay_exec(delay_time_insec, function, urgency, *args, **kwargs)[source]

delay exec for the abstract

abstract exec_worker(check_interval, func_queue, worker_name)[source]

exec worker

class cup.services.executor.CronExecution(threads_num=3, name=None)[source]

Bases: ExecutionService

run execution like cron.

Plz notice the following circumstances:
  • if the previous task is still running and the scheduing time comes,

    executor will wait until the previous task finishes

calcel_delay_exec(taskid)[source]

calcel delayexec by taskid

delay_exec(delay_time_insec, crontask, function, urgency, *args, **kwargs)[source]

delay_execute function after delay_time seconds

You can use urgency := executor.URGENCY_NORMAL, by default

TODO:

consider about stopping timers when invoking stop function

exec_worker(check_interval, func_queue, worker_name='')[source]

exec worker

get_taskbyid(md5uuid)[source]
Parameters:

md5uuid – md5sum of uuid

get_tasks()[source]

get all cron execution tasks

schedule(crontask)[source]

schedule the crontask

Parameters:
  • timer_dict

    {   'minute': minute_list,
        'hour': hour_list,
        'weekday': weekday_list,
        'monthday': monday_list,
        'month': month_list
    }
    

  • function – function that to be scheduled

  • args – args of function

  • kwargs – key args of function

class cup.services.executor.CronTask(name, pytz_timezone, timer_dict, md5uuid, function, *args, **kwargs)[source]

Bases: object

crontask for CronExecution

Typical exmaples for timer_dict:

1. one specific time Jan 1st, 2020 18:01

timer_dict {
    'minute': [1], 'hour': [18], 'weekday': None,
    'monthday': [1], 'month': [1]
}

2. every minute {

timer_dict {
        'minute': [0, 1, 2, ....59], 'hour': None, 'weekday': None,
        'monthday': None, 'month': None
}

3. every hour

timer_dict {
        'minute': [0], 'hour': None, 'weekday': None,
        'monthday': None, 'month': None
}

4. every 20:00 PM

timer_dict {
    'minute': [0], 'hour': [20], 'weekday': None,
    'monthday': None, 'month': None
}

5. every 20:00 PM at workdays (Monday to Friday)

timer_dict {
        'minute': [0], 'hour': [20], 'weekday': [1, 2, 3, 4, 5],
        'monthday': None, 'month': None
}

6. every 20:00 PM at workdays for Jan and July

timer_dict {
    'minute': [0], 'hour': [20], 'weekday': [1, 2, 3, 4, 5],
    'monthday': None, 'month': [1, 7]
}

7. every 20:00 PM at 1st, 3rd of Jan and July

timer_dict {
    'minute': [0], 'hour': [20], 'weekday': None,
    'monthday': [1, 3], 'month': [1, 7]
}
classmethod check_monthday_weekday(tmp_dict, timer_params)[source]

check if monthday / weekday valid

get_funcargs()[source]

return (function, args, kwargs)

get_last_schedtime()[source]

get last sched time, return with a datetime.datetime object. Plz notice the timezone is enabled

get_timer()[source]

return timer of the crontask

last_schedtime()[source]

return last schedtime

name()[source]

return name of the crontask

classmethod next_hour(tmp_dict, timer_params)[source]
Returns:

{ ‘year’: xxx,

’month’: xxx, ‘monthday’:

classmethod next_minute(tmp_dict, timer_params)[source]
Returns:

{ ‘year’: xxx,

’month’: xxx, ‘monthday’:

classmethod next_month(tmp_dict, timer_params)[source]

set tmp_dict to next valid date, specifically month

Parameters:
  • tmp_dict

    {
        'year': xxxx,
        'month': xxxx,
        'monthday': xxxx,
        'weekday': xxxx,
        'hour': xxxx,
        'minute': xxxx
    }
    

  • timer_params – valid timer dict, same to self._timer_params

classmethod next_monthday_weekday(tmp_dict, timer_params)[source]

set next monthday && weekday

next_schedtime(starting_fromdate=None)[source]

return next schedule time with timezone enabled.

pytz_timezone()[source]

return pytz timezone

set_last_schedtime(datetime_obj)[source]

set_last_schedtime.

Parameters:

timestamp – E.g. set timestamp to time.time() for “now”

set_timer(timer)[source]

set timer to this crontask

taskid()[source]

get 32 byte taskid

class cup.services.executor.ExecutionService(delay_exe_thdnum=3, queue_exec_thdnum=4, name=None)[source]

Bases: AbstractExecution

execution service

delay_exec(delay_time_insec, function, urgency, *args, **kwargs)[source]

delay_execute function after delay_time seconds

You can use urgency := executor.URGENCY_NORMAL, by default

TODO:

consider about stopping timers when invoking stop function

exec_worker(check_interval, func_queue, worker_name='')[source]

exec worker

queue_exec(function, urgency, *argvs, **kwargs)[source]

execute function in a queue. Functions will be queued in line to be scheduled.

You can use urgency := executor.URGENCY_NORMAL, by default.

run()[source]

Delayexec worker checks task every 20ms QueueExec worker checks task every 100ms

start()[source]

alias for self.run

stop(wait_workerstop=True)[source]

stop the executor service.

Wait_workerstop:

If wait_workerstop is True, the function will hang util all workers finish thier tasks.

Otherwise, the function will not hang, but tell you whether it’s succeeded stopped. (True for stoped, False for not stopped yet)

cup.services.generator module

class CGeneratorMan(object)

Generate unique integers, strings and auto incremental uint. Notice CGeneratorMan is a singleton class, which means cup will keep only 1 instance per process.

init:
__init__(self, str_prefix=get_local_hostname())

local hostname will be used by default.

methods:
get_uniqname()

get unique name. Host-Level unique name (build upon str_prefix, pid, threadid)

get_next_uniq_num()

Process-level auto incremental uint. Thread-safe

reset_uniqid_start(num=0)

Reset next uniqid to which genman starts from

get_random_str()

Get random string by length

get_uuid()

Get uuid

class cup.services.generator.CycleIDGenerator(ip, port)[source]

Bases: object

cycle id generator. 128bit ID will be produced.

128 bit contains: a. 64bit [ip, port, etc] b. 64bit[auto increment id]

classmethod id2_hexstring(num)[source]

return hex of the id

next_id()[source]

get next id

reset_nextid(nextid)[source]

reset nextid that will return to you

cup.services.heartbeat module

description:

heartbeat related module

class cup.services.heartbeat.Device(name)[source]

Bases: object

Base class for all devices in heartbeat service

deserilize(binary)[source]

deserilize it from binary

get_dict_resinfo()[source]

get dict of resource info

get_last_healthy()[source]

get last_healthy time of the device

get_name()[source]

get name

serilize()[source]

serilize device info

set_last_healthy()[source]

set last_healthy time

class cup.services.heartbeat.HeartbeatService(judge_lost_in_sec, keep_lost=False)[source]

Bases: object

HeartBeat service

adjust_judge_lost_time(time_in_sec)[source]

adjust judge_lost_in_sec

cleanup_oldlost(dump_file=None)[source]

cleanup old lost devices.

Parameters:

dump_file – if dump_file is not None, we will store devices info into dump_file Otherwise, we will cleanup the lost devices only.

get_lost()[source]

get lost devices

is_device_registered(key, including_dead=False)[source]

tell if the device is registered

refresh(key, device_obj=None)[source]
Parameters:

key – refresh the device by key

Returns:

if key does not exist, return False else, fresh the last_healthy time of the device

class cup.services.heartbeat.LinuxHost(name, init_this_host=False, iface='eth0', port=0)[source]

Bases: Device

a linux host resource

get_cpu_idle()[source]

get cpu idle rate

get_ip()[source]

return ip information

get_ip_port()[source]

return ip:port

get_mem_info()[source]
Returns:

(mem_inuse, mem_total), in MB

get_net_usage()[source]
Returns:

(net_in, net_out)

set_cpu_idle(idle_rate)[source]

set cpu idle rate

set_ip_port(ipaddr)[source]

set ip information

Parameters:

ipaddr – ipaddr should be string and something like 10.10.10.1

set_linux_res_bydict(info_dict)[source]
Parameters:

info_dict

{
    'iface': 'eth0',
    'ipaddr': '10.10.10.1',
    'port':   8089,
    'cpu_idle': 50,
    'mem_inuse': 1024,        # MB
    'mem_total': 8192,
    'net_in':    8192,        # kb
    'net_out':   102400,      # kb
}

set_mem_usage(mem_inuse, mem_total)[source]

set up mem_inuse and mem_total. Will update any of them if it is not None.

set_net_usage(net_in, net_out)[source]
Parameters:
  • net_in – net_in in kB/s. If net_in is None, will update nothing.

  • net_out – net_out in kB/s. If net_out is None, will update nothing.

class cup.services.heartbeat.MacHost(name, init_this_host=False, iface='en01', port=0)[source]

Bases: Device

a mac host

class cup.services.heartbeat.RestrictedUnpickler(file, *, fix_imports=True, encoding='ASCII', errors='strict', buffers=())[source]

Bases: Unpickler

find_class(module, name)[source]

Only allow safe classes from builtins

Only check builtins when it’s run under py 3

cup.services.heartbeat.restricted_loads(s)[source]

Helper function analogous to pickle.loads()

cup.services.msgbroker module

description:

Msg Broker Service. Every component of a process can produce_msg.

This msg broker feature is still exprimental. Do not use it in production until this comment is deleted.

class cup.services.msgbroker.BrokerCenter(name)[source]

Bases: BaseBroker

Errmsg broker center

comsume_msg(msg_type)[source]

get msg_type from the broker center

produce_msg(msg_type, extra_info, error)[source]

register msg

class cup.services.msgbroker.SystemErrmsgBroker(name)[source]

Bases: BrokerCenter

system errmsg broker, you can use it to determine whether exiting from the system is on the way

clean_data(path, exclude_msgtypes=None)[source]

clean data of the remaining data

fatal_alert(path, msg, need_stop=True)[source]

fatal alert systems

get_fatal_alerts(path)[source]

get fatal alerts of the current running round

need_stop(path)[source]
return True if the system registered on

the path needs to stop immediately

register_msg(path, msgtype, msg)[source]

register msg into the system

register_msgtype_callback(path, msg_type, callback_func)[source]

register msgtype with callback functions

register_wakeup(path, msgtype, alert_cap_num, callfunc)[source]

register wakeups.

Parameters:
  • alert_cap_num – If alert_cap_num is 0, whenever a msg of msgtype is received, the callfunc will be called.

  • msgtype – [msgbroker.FATAL|msgbroker.WARN]

warnning_alert(path, msg)[source]

warnning alert

cup.services.serializer module

description:

serilizers including local file serilizer

class cup.services.serializer.LocalFileSerilizer(storage_dir, skip_badlog=False, max_logfile_size=1048576, persist_after_sec=600)[source]

Bases: BaseSerilizer

local file serilizer

add_log(log_type, log_mode, log_binary)[source]

add log into the local file

Returns:

a tuple (result_True_or_False, logid_or_None)

close_read()[source]

close open4read

close_write()[source]

close the writer

get_subdir(log_id=-1)[source]

get log dir

is_empty()[source]

return if there is no log

is_stream_wbopen()[source]

is stream open

open4read()[source]

open logs for read

open4write(truncate_last_failure=True)[source]
Raises:

Exception – if encounter any IOError, will raise IOError(errmsg)

purge_data(before_logid)[source]

log files which contains log (less than before_logid) will be purged.

read(record_num=128)[source]

load log into memory

Notice:

If skip_badlog is not True, will raise IOError if the stream encounters any error.

Otherwise, the stream will skip the bad log file, move to next one and continue reading

Returns:

  1. return a list of “record_num” of LogRecord.

b. If the count number of list is less than record_num, it means the stream encounter EOF, plz read again afterwards.

  1. If the returned is None, it means the stream got nothing, plz

    try again.

set_current_logid(logid)[source]

reset current log id

set_name(name)[source]

set a name of str for the serializer

switch_logfilelist()[source]

switch logfile to logfile.old

class cup.services.serializer.LogRecord(length, log_id, log_type, log_mode, log_binary)

Bases: tuple

length

Alias for field number 0

log_binary

Alias for field number 4

log_id

Alias for field number 1

log_mode

Alias for field number 3

log_type

Alias for field number 2

cup.services.threadpool module

Note

Guannan back-ported threadpool from twisted.python. if any concern, feel free to contact Guannan (mythmgn@gmail.com)

Note

Mit License applied for twisted:

http://www.opensource.org/licenses/mit-license.php

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

class cup.services.threadpool.ThreadPool(minthreads=5, maxthreads=20, name=None, daemon_threads=False)[source]

Bases: object

Threadpool class

add_1job(func, *args, **kwargs)[source]

Add one job that you want the pool to schedule.

Caution

Notice if you need to handle data after finishing [func], plz use [add_1job_with_callback] which supports a [callback] option.

Parameters:
  • func – function that will be scheduled by the thread pool

  • args – args that the [func] needs

  • kw – kwargs that [func] needs

add_1job_with_callback(result_callback, func, *args, **kwargs)[source]
Parameters:
  • result_callback

    Important

    plz notice whether succeed or fail, the result_callback function will be called after [func] is called.

    function result_callback needs to accept two parameters: (ret_in_bool, result). (True, result) will be passed to the [func] on success. (False, result) will be passed otherwise.

    if [func] raise any Exception, result_callback will get (False, failure_info) as well.

  • func – same to func for add_1job

  • args – args for [func]

  • kwargs – kwargs for [func]

adjust_poolsize(minthreads=None, maxthreads=None)[source]

adjust pool size

dump_stats(print_stdout=False)[source]

Dump the threadpool stat to log or stdout. Info is from class method [get_stats]

get_stats()[source]

get threadpool running stats waiters_num is pending thread num working_num is working thread num thread_num is the total size of threads

stat = {}
stat['queue_len'] = self._jobqueue.qsize()
stat['waiters_num'] = len(self._waiters)
stat['working_num'] = len(self._working)
stat['thread_num'] = len(self._threads)
start()[source]

call start before you use the threadpool

start1worker()[source]

add a thread for worker threads in the pool

stop(force_stop=False)[source]

stop the thread pool. Notice calling this method will wait there util all worker threads exit.

Parameters:

force_stop – if force_stop is True, try to stop the threads in the pool immediately (and this may do DAMAGE to your code logic)

stop1worker()[source]

decrease one thread for the worker threads

try_stop(check_interval=0.1)[source]

try to stop the threadpool.

If it cannot stop the pool RIGHT NOW, will NOT block.