cup.services package

Submodules

cup.services.autowait module

description:auto wait related modules.
cup.services.autowait.wait_until_file_exist(dst_path, file_name, max_wait_sec=10, interval_sec=2, recursive=False)[source]

wait util the file exists or the function timeout

Parameters:
  • dst_path – searching path
  • file_name – filename, support *
  • max_wait_sec – max wating time until timeout
  • interval_sec – check interval
  • recursive – recursively search or not
Returns:

True if found.

cup.services.autowait.wait_until_reg_str_exist(dst_file_path, reg_str, max_wait_sec=10, interval_sec=0.5)[source]

wait until any line in the file matches the reg_str(regular expression string)

Parameters:
  • dst_file_path – searching path
  • reg_str – regular expression string
  • max_wait_sec – maximum waiting time until timeout
  • interval_sec – state check interval
Returns:

True if found

cup.services.autowait.wait_until_process_not_exist(process_path, max_wait_sec=10, interval_sec=0.5)[source]

wait until the process does not exist anymore or the function timeouts

Parameters:
  • process_path – process cwd
  • max_wait_sec – maximum waiting time until timeout. 10 seconds by default
  • interval_sec – state check interval, 0.5 second by default
Returns:

return True if the process disapper before timeout

cup.services.autowait.wait_until_port_used(port, max_wait_sec=10, interval_sec=0.5)[source]

wait until the port is used. Notice this function will invoke a bash shell to execute command [netstat]!

Returns:return True if the port is used
cup.services.autowait.wait_until_process_used_ports(process_path, ports, max_wait_sec=10, interval_sec=0.5)[source]

wait until the process has taken the ports before timeouts

Returns:True if all ports are used by the specific process. False, otherwise
cup.services.autowait.wait_until_port_not_used(port, max_wait_sec=10, interval_sec=0.5)[source]

wait until the port is free

Returns:return True if the port is free before timeout
cup.services.autowait.wait_until_process_exist(process_path, max_wait_sec=10, interval_sec=0.5)[source]

wait until the process exists

Parameters:
  • process_path – the specific process working path
  • max_wait_sec – maximum waiting time until timeout
  • interval_sec – state check interval
Returns:

return True if the process is found before timeout

cup.services.autowait.wait_until_process_killed(process_path, ports, max_wait_sec=10, interval_sec=0.5)[source]

wait until the [process] does not exists and all [ports] are free

Parameters:
  • process_path – process cwd
  • ports – port list
  • interval_sec – state check interval
Returns:

True if all conditions meet.

cup.services.buffers module

description:buffer pool
class cup.services.buffers.Buffer(items, block_size, uniqid)[source]

Bases: object

Buffer object which you get from BufferPool.allocate(num)

get()[source]

return (True, (content, block_size, total_length)) if succeed

Otherwise, return (False, err_msg, None)

get_byte_arrays()[source]

get byte arrays in the buffer

get_uniq_id()[source]

return the uniqid for this object

set(content)[source]

return (True, None) if succeed.

return (False, error_msg) otherwise

class cup.services.buffers.BufferPool(pool_size, block_size=135168, extendable=False)[source]

Bases: object

buffer pool class which will ease memory fragment

allocate(num)[source]

acclocate buff with num * block_size

Returns:(True, Buffer object)

(False, str_error_msg)

deallocate(buff)[source]

return the acclocated buff back to the pool

cup.services.executor module

description:
  1. Delay-execute sth after several seconds
  2. Schedule some tasks in a queue.
class cup.services.executor.ExecutionService(delay_exe_thdnum=3, queue_exec_thdnum=4)[source]

Bases: object

execution service

delay_exec(delay_time_insec, function, urgency, *args, **kwargs)[source]

delay_execute function after delay_time seconds

You can use urgency := executor.URGENCY_NORMAL, by default

TODO:consider about stopping timers when invoking stop function
queue_exec(function, urgency, *argvs, **kwargs)[source]

execute function in a queue. Functions will be queued in line to be scheduled.

You can use urgency := executor.URGENCY_NORMAL, by default.

run()[source]

Delayexec worker checks task every 20ms QueueExec worker checks task every 100ms

stop(wait_workerstop=True)[source]

stop the executor service.

Wait_workerstop:
 

If wait_workerstop is True, the function will hang util all workers finish thier tasks.

Otherwise, the function will not hang, but tell you whether it’s succeeded stopped. (True for stoped, False for not stopped yet)

cup.services.generator module

class CGeneratorMan(object)

Generate unique integers, strings and auto incremental uint. Notice CGeneratorMan is a singleton class, which means cup will keep only 1 instance per process.

init:
__init__(self, str_prefix=get_local_hostname())

local hostname will be used by default.

methods:
get_uniqname()

get unique name. Host-Level unique name (build upon str_prefix, pid, threadid)

get_next_uniq_num()

Process-level auto incremental uint. Thread-safe

reset_uniqid_start(num=0)

Reset next uniqid to which genman starts from

get_random_str()

Get random string by length

get_uuid()

Get uuid

class cup.services.generator.CycleIDGenerator(ip, port)[source]

Bases: object

cycle id generator. 128bit

64bit [ip, port, etc] 64bit[auto increment id]

classmethod id2_hexstring(num)[source]

return hex of the id

next_id()[source]

get next id

reset_nextid(nextid)[source]

reset nextid that will return to you

cup.services.heartbeat module

description:heartbeat related module
class cup.services.heartbeat.Device(name)[source]

Bases: object

Base class for all devices in heartbeat service

deserilize(binary)[source]

deserilize it from binary

get_dict_resinfo()[source]

get dict of resource info

get_last_healthy()[source]

get last_healthy time of the device

get_name()[source]

get name

serilize()[source]

serilize device info

set_last_healthy()[source]

set last_healthy time

class cup.services.heartbeat.HeartbeatService(judge_lost_in_sec, keep_lost=False)[source]

Bases: object

HeartBeat service

adjust_judge_lost_time(time_in_sec)[source]

adjust judge_lost_in_sec

cleanup_oldlost(dump_file=None)[source]

cleanup old lost devices.

Parameters:dump_file – if dump_file is not None, we will store devices info into dump_file Otherwise, we will cleanup the lost devices only.
get_lost()[source]

get lost devices

is_device_registered(key, including_dead=False)[source]

tell if the device is registered

refresh(key, device_obj=None)[source]
Parameters:key – refresh the device by key
Returns:if key does not exist, return False else, fresh the last_healthy time of the device
class cup.services.heartbeat.LinuxHost(name, init_this_host=False, iface='eth0', port=0)[source]

Bases: cup.services.heartbeat.Device

a linux host resource

get_cpu_idle()[source]

get cpu idle rate

get_ip()[source]

return ip information

get_ip_port()[source]

return ip:port

get_mem_info()[source]
Returns:(mem_inuse, mem_total), in MB
get_net_usage()[source]
Returns:(net_in, net_out)
set_cpu_idle(idle_rate)[source]

set cpu idle rate

set_ip_port(ipaddr)[source]

set ip information

Parameters:ipaddr – ipaddr should be string and something like 10.10.10.1
set_linux_res_bydict(info_dict)[source]
{
‘iface’: ‘eth0’, ‘ipaddr’: ‘10.10.10.1’, ‘port’: 8089, ‘cpu_idle’: 50, ‘mem_inuse’: 1024, # MB ‘mem_total’: 8192, ‘net_in’: 8192, # kb ‘net_out’: 102400, # kb

}

set_mem_usage(mem_inuse, mem_total)[source]

set up mem_inuse and mem_total. Will update any of them if it is not None.

set_net_usage(net_in, net_out)[source]
Parameters:
  • net_in – net_in in kB/s. If net_in is None, will update nothing.
  • net_out – net_out in kB/s. If net_out is None, will update nothing.

cup.services.msgbroker module

description:Msg Broker Service. Every component of a process can produce_msg
class cup.services.msgbroker.BrokerCenter(name)[source]

Bases: cup.services.msgbroker.BaseBroker

Errmsg broker center

comsume_msg(msg_type)[source]

get msg_type from the broker center

produce_msg(msg_type, extra_info, error)[source]

register msg

class cup.services.msgbroker.SystemErrmsgBroker(name)[source]

Bases: cup.services.msgbroker.BrokerCenter

system errmsg broker, you can use it to determine whether exiting from the system is on the way

clean_data(path, exclude_msgtypes=None)[source]

clean data of the remaining data

fatal_alert(path, msg, need_stop=True)[source]

fatal alert systems

get_fatal_alerts(path)[source]

get fatal alerts of the current running round

need_stop(path)[source]
return True if the system registered on
the path needs to stop immediately
register_msg(path, msgtype, msg)[source]

register msg into the system

register_msgtype_callback(path, msg_type, callback_func)[source]

register msgtype with callback functions

register_wakeup(path, msgtype, alert_cap_num, callfunc)[source]

register wakeups.

Parameters:
  • alert_cap_num – If alert_cap_num is 0, whenever a msg of msgtype is received, the callfunc will be called.
  • msgtype – [msgbroker.FATAL|msgbroker.WARN]
warnning_alert(path, msg)[source]

warnning alert

cup.services.serializer module

description:serilizers including local file serilizer
class cup.services.serializer.LogRecord(length, log_id, log_type, log_mode, log_binary)

Bases: tuple

length

Alias for field number 0

log_binary

Alias for field number 4

log_id

Alias for field number 1

log_mode

Alias for field number 3

log_type

Alias for field number 2

class cup.services.serializer.LocalFileSerilizer(storage_dir, skip_badlog=False, max_logfile_size=65536)[source]

Bases: cup.services.serializer.BaseSerilizer

local file serilizer

add_log(log_type, log_mode, log_binary)[source]

add log into the local file

close_read()[source]

close open4read

close_write()[source]

close the writer

get_next_logfile(logid)[source]

get current logfile

get_subdir(log_id=-1)[source]

get log dir

is_stream_open()[source]

is stream open

open4read()[source]

open logs for read

open4write(truncate_last_failure=True)[source]
Raises:Exception – if encounter any IOError, will raise IOError(errmsg)
purge_data(before_logid)[source]

log files which contains log (less than before_logid) will be purged.

read(record_num=128)[source]

load log into memory

Notice:

If skip_badlog is not True, will raise IOError if the stream encounters any error.

Otherwise, the stream will skip the bad log file, move to next one and continue reading

Returns:

  1. return a list of “record_num” of LogRecords.

b. If the count number of list is less than record_num, it means the stream encounter EOF, plz read again afterwards.

  1. If the returned is None, it means the stream got nothing, plz
    try again.

set_current_logid(logid)[source]

reset current log id

cup.services.threadpool module

description:

Guannan back-ported threadpool from twisted.python. if any concern, plz contact Guannan (mythmgn@gmail.com)

license:
Mit License applied for twisted:

http://www.opensource.org/licenses/mit-license.php

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

class cup.services.threadpool.ThreadPool(minthreads=5, maxthreads=20, name=None, daemon_threads=False)[source]

Bases: object

Threadpool class

add_1job(func, *args, **kwargs)[source]

Add one job that you want the pool to schedule. Notice if you need to handle data after finishing [func], plz use [add_1job_with_callback] which supports a [callback] option.

Parameters:
  • func – function that will be scheduled by the thread pool
  • *args

    args that the [func] needs

  • **kw

    kwargs that [func] needs

add_1job_with_callback(result_callback, func, *args, **kwargs)[source]
Parameters:
  • result_callback

    plz notice whether succeed or fail, the result_callback function will be called after [func] is called.

    function result_callback needs to accept two parameters: (ret_in_bool, result). (True, result) will be passed to the [func] on success. (False, result) will be passed otherwise.

    if [func] raise any Exception, result_callback will get (False,
    failure_info) as well.
  • func – same to func for add_1job
  • *args

    args for [func]

  • **kwargs

    kwargs for [func]

adjust_poolsize(minthreads=None, maxthreads=None)[source]

adjust pool size

dump_stats(print_stdout=False)[source]

Dump the threadpool stat to log or stdout. Info is from class method [get_stats]

get_stats()[source]

get threadpool running stats waiters_num is pending thread num working_num is working thread num thread_num is the total size of threads

::
stat = {} stat[‘queue_len’] = self._jobqueue.qsize() stat[‘waiters_num’] = len(self._waiters) stat[‘working_num’] = len(self._working) stat[‘thread_num’] = len(self._threads)
start()[source]

call start before you use the threadpool

start1worker()[source]

add a thread for worker threads in the pool

stop(force_stop=False)[source]

stop the thread pool. Notice calling this method will wait there util all worker threads exit.

Force_stop:if force_stop is True, try to stop the threads in the pool immediately (and this may do DAMAGE to your code logic)
stop1worker()[source]

decrease one thread for the worker threads

try_stop(check_interval=0.1)[source]

try to stop the threadpool.

If it cannot stop the pool RIGHT NOW, will NOT block.

Module contents

@author Guannan Ma @mythmgn