cup.services package¶
@author Guannan Ma @mythmgn
Submodules¶
cup.services.autowait module¶
- description:
auto wait related modules.
- cup.services.autowait.wait_until_file_exist(dst_path, file_name, max_wait_sec=10, interval_sec=2, recursive=False)[source]¶
wait util the file exists or the function timeout
- Parameters:
dst_path – searching path
file_name – filename, support *
max_wait_sec – max wating time until timeout
interval_sec – check interval
recursive – recursively search or not
- Returns:
True if found.
- cup.services.autowait.wait_until_port_not_used(port, max_wait_sec=10, interval_sec=0.5)[source]¶
wait until the port is free
- Returns:
return True if the port is free before timeout
- cup.services.autowait.wait_until_port_used(port, max_wait_sec=10, interval_sec=0.5)[source]¶
wait until the port is used. Notice this function will invoke a bash shell to execute command [netstat]!
- Returns:
return True if the port is used
- cup.services.autowait.wait_until_process_exist(process_path, max_wait_sec=10, interval_sec=0.5)[source]¶
wait until the process exists
- Parameters:
process_path – the specific process working path
max_wait_sec – maximum waiting time until timeout
interval_sec – state check interval
- Returns:
return True if the process is found before timeout
- cup.services.autowait.wait_until_process_killed(process_path, ports, max_wait_sec=10, interval_sec=0.5)[source]¶
wait until the [process] does not exists and all [ports] are free
- Parameters:
process_path – process cwd
ports – port list
interval_sec – state check interval
- Returns:
True if all conditions meet.
- cup.services.autowait.wait_until_process_not_exist(process_path, max_wait_sec=10, interval_sec=0.5)[source]¶
wait until the process does not exist anymore or the function timeouts
- Parameters:
process_path – process cwd
max_wait_sec – maximum waiting time until timeout. 10 seconds by default
interval_sec – state check interval, 0.5 second by default
- Returns:
return True if the process disapper before timeout
- cup.services.autowait.wait_until_process_used_ports(process_path, ports, max_wait_sec=10, interval_sec=0.5)[source]¶
wait until the process has taken the ports before timeouts
- Returns:
True if all ports are used by the specific process. False, otherwise
- cup.services.autowait.wait_until_reg_str_exist(dst_file_path, reg_str, max_wait_sec=10, interval_sec=0.5)[source]¶
wait until any line in the file matches the reg_str(regular expression string)
- Parameters:
dst_file_path – searching path
reg_str – regular expression string
max_wait_sec – maximum waiting time until timeout
interval_sec – state check interval
- Returns:
True if found
cup.services.buffers module¶
- description:
buffer pool
- class cup.services.buffers.Buffer(items, block_size, uniqid)[source]¶
Bases:
object
Buffer object which you get from BufferPool.allocate(num).
A Buffer consists of several bytearrays which is mutable compared to a normal str. In other words, if you have senarios like: allocte mem and deallocte mem frenquently. With high chance you can replace directly using str by Buffer. It will reduce the memory fragments.
cup.services.executor module¶
- description:
Delay-execute sth after several seconds
Schedule some tasks in a queue.
- class cup.services.executor.AbstractExecution(delay_exe_thdnum, queue_exec_thdnum, name)[source]¶
Bases:
object
abstract execution service
- class cup.services.executor.CronExecution(threads_num=3, name=None)[source]¶
Bases:
ExecutionService
run execution like cron.
- Plz notice the following circumstances:
- if the previous task is still running and the scheduing time comes,
executor will wait until the previous task finishes
- class cup.services.executor.CronTask(name, pytz_timezone, timer_dict, md5uuid, function, *args, **kwargs)[source]¶
Bases:
object
crontask for CronExecution
Typical exmaples for timer_dict:
1. one specific time Jan 1st, 2020 18:01
timer_dict { 'minute': [1], 'hour': [18], 'weekday': None, 'monthday': [1], 'month': [1] }
2. every minute {
timer_dict { 'minute': [0, 1, 2, ....59], 'hour': None, 'weekday': None, 'monthday': None, 'month': None }
3. every hour
timer_dict { 'minute': [0], 'hour': None, 'weekday': None, 'monthday': None, 'month': None }
4. every 20:00 PM
timer_dict { 'minute': [0], 'hour': [20], 'weekday': None, 'monthday': None, 'month': None }
5. every 20:00 PM at workdays (Monday to Friday)
timer_dict { 'minute': [0], 'hour': [20], 'weekday': [1, 2, 3, 4, 5], 'monthday': None, 'month': None }
6. every 20:00 PM at workdays for Jan and July
timer_dict { 'minute': [0], 'hour': [20], 'weekday': [1, 2, 3, 4, 5], 'monthday': None, 'month': [1, 7] }
7. every 20:00 PM at 1st, 3rd of Jan and July
timer_dict { 'minute': [0], 'hour': [20], 'weekday': None, 'monthday': [1, 3], 'month': [1, 7] }
- classmethod check_monthday_weekday(tmp_dict, timer_params)[source]¶
check if monthday / weekday valid
- get_last_schedtime()[source]¶
get last sched time, return with a datetime.datetime object. Plz notice the timezone is enabled
- classmethod next_hour(tmp_dict, timer_params)[source]¶
- Returns:
- { ‘year’: xxx,
’month’: xxx, ‘monthday’:
- classmethod next_minute(tmp_dict, timer_params)[source]¶
- Returns:
- { ‘year’: xxx,
’month’: xxx, ‘monthday’:
- classmethod next_month(tmp_dict, timer_params)[source]¶
set tmp_dict to next valid date, specifically month
- Parameters:
tmp_dict –
{ 'year': xxxx, 'month': xxxx, 'monthday': xxxx, 'weekday': xxxx, 'hour': xxxx, 'minute': xxxx }
timer_params – valid timer dict, same to self._timer_params
- class cup.services.executor.ExecutionService(delay_exe_thdnum=3, queue_exec_thdnum=4, name=None)[source]¶
Bases:
AbstractExecution
execution service
- delay_exec(delay_time_insec, function, urgency, *args, **kwargs)[source]¶
delay_execute function after delay_time seconds
You can use urgency := executor.URGENCY_NORMAL, by default
- TODO:
consider about stopping timers when invoking stop function
cup.services.generator module¶
class CGeneratorMan(object)¶
Generate unique integers, strings and auto incremental uint. Notice CGeneratorMan is a singleton class, which means cup will keep only 1 instance per process.
- init:
- __init__(self, str_prefix=get_local_hostname())
local hostname will be used by default.
- methods:
- get_uniqname()
get unique name. Host-Level unique name (build upon str_prefix, pid, threadid)
- get_next_uniq_num()
Process-level auto incremental uint. Thread-safe
- reset_uniqid_start(num=0)
Reset next uniqid to which genman starts from
- get_random_str()
Get random string by length
- get_uuid()
Get uuid
cup.services.heartbeat module¶
- description:
heartbeat related module
- class cup.services.heartbeat.Device(name)[source]¶
Bases:
object
Base class for all devices in heartbeat service
- class cup.services.heartbeat.HeartbeatService(judge_lost_in_sec, keep_lost=False)[source]¶
Bases:
object
HeartBeat service
- class cup.services.heartbeat.LinuxHost(name, init_this_host=False, iface='eth0', port=0)[source]¶
Bases:
Device
a linux host resource
- set_ip_port(ipaddr)[source]¶
set ip information
- Parameters:
ipaddr – ipaddr should be string and something like 10.10.10.1
- set_linux_res_bydict(info_dict)[source]¶
- Parameters:
info_dict –
{ 'iface': 'eth0', 'ipaddr': '10.10.10.1', 'port': 8089, 'cpu_idle': 50, 'mem_inuse': 1024, # MB 'mem_total': 8192, 'net_in': 8192, # kb 'net_out': 102400, # kb }
- class cup.services.heartbeat.MacHost(name, init_this_host=False, iface='en01', port=0)[source]¶
Bases:
Device
a mac host
cup.services.msgbroker module¶
- description:
Msg Broker Service. Every component of a process can produce_msg.
This msg broker feature is still exprimental. Do not use it in production until this comment is deleted.
- class cup.services.msgbroker.SystemErrmsgBroker(name)[source]¶
Bases:
BrokerCenter
system errmsg broker, you can use it to determine whether exiting from the system is on the way
- register_msgtype_callback(path, msg_type, callback_func)[source]¶
register msgtype with callback functions
cup.services.serializer module¶
- description:
serilizers including local file serilizer
- class cup.services.serializer.LocalFileSerilizer(storage_dir, skip_badlog=False, max_logfile_size=1048576, persist_after_sec=600)[source]¶
Bases:
BaseSerilizer
local file serilizer
- add_log(log_type, log_mode, log_binary)[source]¶
add log into the local file
- Returns:
a tuple (result_True_or_False, logid_or_None)
- open4write(truncate_last_failure=True)[source]¶
- Raises:
Exception – if encounter any IOError, will raise IOError(errmsg)
- purge_data(before_logid)[source]¶
log files which contains log (less than before_logid) will be purged.
- read(record_num=128)[source]¶
load log into memory
- Notice:
If skip_badlog is not True, will raise IOError if the stream encounters any error.
Otherwise, the stream will skip the bad log file, move to next one and continue reading
- Returns:
return a list of “record_num” of LogRecord.
b. If the count number of list is less than record_num, it means the stream encounter EOF, plz read again afterwards.
- If the returned is None, it means the stream got nothing, plz
try again.
cup.services.threadpool module¶
Note
Guannan back-ported threadpool from twisted.python. if any concern, feel free to contact Guannan (mythmgn@gmail.com)
Note
- Mit License applied for twisted:
http://www.opensource.org/licenses/mit-license.php
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- class cup.services.threadpool.ThreadPool(minthreads=5, maxthreads=20, name=None, daemon_threads=False)[source]¶
Bases:
object
Threadpool class
- add_1job(func, *args, **kwargs)[source]¶
Add one job that you want the pool to schedule.
Caution
Notice if you need to handle data after finishing [func], plz use [add_1job_with_callback] which supports a [callback] option.
- Parameters:
func – function that will be scheduled by the thread pool
args – args that the [func] needs
kw – kwargs that [func] needs
- add_1job_with_callback(result_callback, func, *args, **kwargs)[source]¶
- Parameters:
result_callback –
Important
plz notice whether succeed or fail, the result_callback function will be called after [func] is called.
function result_callback needs to accept two parameters: (ret_in_bool, result). (True, result) will be passed to the [func] on success. (False, result) will be passed otherwise.
if [func] raise any Exception, result_callback will get (False, failure_info) as well.
func – same to func for add_1job
args – args for [func]
kwargs – kwargs for [func]
- dump_stats(print_stdout=False)[source]¶
Dump the threadpool stat to log or stdout. Info is from class method [get_stats]
- get_stats()[source]¶
get threadpool running stats waiters_num is pending thread num working_num is working thread num thread_num is the total size of threads
stat = {} stat['queue_len'] = self._jobqueue.qsize() stat['waiters_num'] = len(self._waiters) stat['working_num'] = len(self._working) stat['thread_num'] = len(self._threads)