cup package

Submodules

cup.cache module

description:decorators related module
class cup.cache.KvCache[source]

Bases: object

Key-Value Cache object

cleanup_expired()[source]

Delete all expired items

clear()[source]

remove all kv cache inside.

get(key)[source]

Get your cache with key. If the cache is expired, it will return None. If the key does not exist, it will return None.

get_expired()[source]
Returns:A dict. Return expired items. Return type is a dict (
{
‘key’ : (value, expire_time)

}

)

set(kvlist, expire_sec=None)[source]

set cache with kvlist

Parameters:
  • kvlist – kvlist is a dict that contains your cache.
  • expire_sec – if expire_sec is None, the cache will never expire.
stat()[source]
Returns:a tuple with (item_num, expired_num)

cup.const module

internal const class

exception cup.const.ConstError(msg='')

Bases: cup.err.BaseCupException

const error

cup.decorators module

description:decorators related module
class cup.decorators.Singleton(cls)[source]

Bases: object

Make your class singeton

example:

from cup import decorators

@decorators.Singleton
class YourClass(object):
    def __init__(self):
    pass
class cup.decorators.TraceUsedTime(b_print_stdout=False, enter_msg='', leave_msg='')[source]

Bases: object

Trace used time inside a function.

Will print to LOGFILE if you initialized logging with cup.log.init_comlog.

example:

import time

from cup import decorators

@decorators.TraceUsedTime(True)
def test():
    print 'test'
    time.sleep(4)


# trace something with context. E.g. event_id
def _test_trace_time_map(sleep_time):
    print "ready to work"
    time.sleep(sleep_time)


traced_test_trace_time_map = decorators.TraceUsedTime(
    b_print_stdout=False,
    enter_msg='event_id: 0x12345',
    leave_msg='event_id: 0x12345'
)(_test_trace_time_map)
traced_test_trace_time_map(sleep_time=5)
cup.decorators.needlinux(function)[source]

make sure the func is only used on linux. Raise cup.err.DecoratorException otherwise.

Platform:Linux

example

from cup import decorators
@decorators.needlinux
def your_func():
    pass
cup.decorators.needposix(function)[source]

only support posix

Platform:Posix compatible

example

from cup import decorators
@decorators.needposix
def your_func():
    pass
cup.decorators.needmac(function)[source]

only support macOS

Platform:macOS

example

from cup import decorators
@decorators.needmac
def your_func():
    pass
cup.decorators.py_versioncheck(function, version)[source]
Platform:any platform + any functions in python
Parameters:version – The python on the OS should be >= param version. E.g. version=(‘2’, ‘7’, ‘0’) OS python version should >= 2.7.0

cup.err module

description:error related module
exception cup.err.BaseCupException(msg)[source]

Bases: exceptions.Exception

base cup Exception. All other cup Exceptions will inherit this.

exception cup.err.DecoratorException(msg)[source]

Bases: cup.err.BaseCupException

exception cup.err.LoggerException(msg)[source]

Bases: cup.err.BaseCupException

Exception for logging, especially for cup.log

exception cup.err.ResException(msg)[source]

Bases: cup.err.BaseCupException

Resource releated Exception

exception cup.err.NoSuchProcess(pid, str_process_name)[source]

Bases: cup.err.ResException

No such Process Exception

exception cup.err.AccessDenied(str_resouce)[source]

Bases: cup.err.ResException

Access Denied

exception cup.err.NetException(msg='')[source]

Bases: cup.err.BaseCupException

Network releated Exception

exception cup.err.AsyncMsgError(msg='')[source]

Bases: cup.err.NetException

cup.net.async msg related Exception

exception cup.err.ThreadTermException(msg='')[source]

Bases: cup.err.BaseCupException

Thread termination error

exception cup.err.LockFileError(msg='')[source]

Bases: cup.err.BaseCupException

exception cup.err.NotImplementedYet(msg='')[source]

Bases: cup.err.BaseCupException

Not implemented yet

exception cup.err.ConfigError(msg='')[source]

Bases: cup.err.BaseCupException

cup.exfile module

description:file related functions
class cup.exfile.LockFile(fpath, locktype=2)[source]

Bases: object

lock file class

lock(blocking=True)[source]

lock the file

Parameters:blocking – If blocking is True, will block there until cup gets the lock. True by default.
Returns:return False if locking fails
Raises:Exception – raise cup.err.LockFileError if blocking is False and the lock action failed
unlock()[source]

unlock the locked file

class cup.exfile.TempFile(filedir, prefix='', suffix='')[source]

Bases: object

tempfile, the temp file will be deleted immediately after the lifetime.

You can use TempFile like the original Python File Object.

tmp = TempFile('./')
tmp.write / read /seek / etc
tmp.close()

cup.flag module

description:
class cup.flag.TypeMan[source]

Bases: cup.flag.BaseMan

msg flag class

cup.log module

description:common log related module
cup.log.debug(msg, *args, **kwargs)[source]

Log a message with severity ‘DEBUG’ on the root logger.

cup.log.info(msg, *args, **kwargs)[source]

Log a message with severity ‘INFO’ on the root logger.

cup.log.warn(msg, *args, **kwargs)

Log a message with severity ‘WARNING’ on the root logger.

cup.log.critical(msg, *args, **kwargs)[source]

Log a message with severity ‘CRITICAL’ on the root logger.

cup.log.init_comlog(loggername, loglevel=20, logfile='cup.log', logtype=0, maxlogsize=1073741824, bprint_console=False, gen_wf=False)[source]

Initialize your logging

Parameters:
  • loggername – Unique logger name
  • loglevel – 4 default levels: log.DEBUG log.INFO log.ERROR log.CRITICAL
  • logfile – log file. Will try to create it if no existence
  • logtype

    Two type candidiates: log.ROTATION and log.INFINITE

    log.ROTATION will let logfile switch to a new one (30 files at most). When logger reaches the 30th logfile, will overwrite from the oldest to the most recent.

    log.INFINITE will write on the logfile infinitely

  • maxlogsize – maxmum log size with byte
  • b_printcmd – print to stdout or not?
  • gen_wf – print log msges with level >= WARNING to file (${logfile}.wf)

E.g.

import logging
from cup import log
log.init_comlog(
    'test',
    log.DEBUG,
    '/home/work/test/test.log',
    log.ROTATION,
    1024,
    False
)
log.info('test xxx')
log.critical('test critical')
cup.log.setloglevel(logginglevel)[source]

change log level during runtime

cup.log.reinit_comlog(loggername, loglevel=20, logfile='cup.log', logtype=0, maxlogsize=1073741824, bprint_console=False, gen_wf=False)[source]

reinitalize logging system, paramters same to init_comlog. reinit_comlog will reset all logging parameters, Make sure you used a different loggername from the old one!

cup.log.get_inited_loggername()[source]

get initialized logger name

cup.log.parse(logline)[source]

return a dict if the line is valid. Otherwise, return None

::
dict_info:= {
‘loglevel’: ‘DEBUG’, ‘date’: ‘2015-10-14’, ‘time’: ‘16:12:22,924’, ‘pid’: 8808, ‘tid’: 1111111, ‘srcline’: ‘util.py:33’, ‘msg’: ‘this is the log content’

}

cup.log.backtrace_info(msg, back_trace_len=0)[source]

info with backtrace support

cup.log.backtrace_debug(msg, back_trace_len=0)[source]

debug with backtrace support

cup.log.backtrace_error(msg, back_trace_len=0)[source]

error msg with backtarce support

cup.log.backtrace_critical(msg, back_trace_len=0)[source]

logging.CRITICAL with backtrace support

cup.mail module

description:

mail related modules.

Recommand using SmtpMailer

cup.mail.mutt_sendmail(tostr, subject, body, attach, content_is_html=False)[source]

Plz notice this function is not recommanded to use. Use SmtpMailer instead.

Parameters:
  • exec_cwd – exec working directory. Plz use
  • tostr – recipt list, separated by ,
  • subject – subject
  • body – email content
  • attach – email attachment
  • content_is_html – is htm mode opened
Returns:

return True on success, False otherwise

class cup.mail.SmtpMailer(sender, server='mail2-in.baidu.com', port=25, is_html=False)[source]

Bases: object

Parameters:
  • sender – mail sender
  • server – smtp的mailserver
  • port – port
  • is_html – is html enabled

smtp server examples

from cup import mail
mailer = mail.SmtpMailer(
    'xxx@xxx.com',
    'xxxx.smtp.xxx.com',
    is_html=True
)
mailer.sendmail(
    [
        'maguannan',
        'liuxuan05',
        'zhaominghao'
    ],
    'test_img',
    (
        'testset <img src="cid:screenshot.png"></img>'
    ),
    [
        '/home/work/screenshot.png',
        '../abc.zip'
    ]
)
sendmail(recipients, subject='', body='', attachments=None, cc=None, bcc=None)[source]

send mail

Parameters:
  • recipients – “list” of recipients. See the example above
  • subject – subject
  • body – body of the mail
  • attachments – “list” of attachments. Plz use absolute file path!
  • cc – cc list
  • bcc – bcc list
Returns:

return (True, None) on success, return (False, error_msg) otherwise

setup(sender, server, port=25, is_html=False)[source]

change parameters during run-time

cup.platforms module

description:cross-platform functions related module
cup.platforms.is_linux()[source]

Check if you are running on Linux.

Returns:True or False
cup.platforms.is_windows()[source]

Check if you are running on Windows.

Returns:True or False
cup.platforms.is_mac()[source]

is mac os

cup.thread module

description:cup thread module
cup.thread.async_raise(tid, exctype)[source]

Raises an exception in the threads with id tid

class cup.thread.CupThread(group=None, target=None, name=None, args=(), kwargs=None, verbose=None)[source]

Bases: threading.Thread

CupThread is a sub-class inherited from threading.Thread;

CupThread has 3 more features:

  1. raise_exc, to send a raise-exception signal to the thread,
    TRY to let the thread raise an exception.
  2. get_my_tid, get thread id
  3. terminate, to stop the thread
Notice if a thread in busy running under kernel-sysmode, it may not
response to the signals! Thus, it may not raise any exception/terminate even though cup has send a CupThread signal!
get_my_tid()[source]

return thread id

raise_exc(exctype)[source]

asynchrously send ‘raise exception’ signal to the thread. :param exctype:

raise Exception, exctype type is class
Returns:return 1 on success. 0 otherwise.
terminate(times=15)[source]

asynchrously terminate the thread.

Return True if the termination is successful or the thread is already stopped. Return False, otherwise.

Times:retry times until call for failure.
class cup.thread.RWLock[source]

Bases: object

read/write lock

acquire_readlock(wait_time=None)[source]

Acquire readlock. Wait_time same to wait_time for acquire_writelock

acquire_writelock(wait_time=None)[source]

Acquire write lock. If wait_time is not None and wait_time >=0, cup will wait until wait_time passes. If the call timeouts and cannot get the lock, will raise RuntimeError

release_readlock()[source]

release read lock

release_writelock()[source]

release write lock

cup.timeplus module

desc:time related module. looking forward to accepting new patches
cup.timeplus.get_str_now(fmt='%Y-%m-%d-%H-%M-%S')[source]

return string of ‘now’

Parameters:fmt – print-format, ‘%Y-%m-%d-%H-%M-%S’ by default

cup.unittest module

desc:unittest module
cup.unittest.assert_true(val, errmsg='')[source]

If val is not True, assert False and print to stdout.

Plz notice, log.critical(errmsg) will be invoked if logging system has been initialized with cup.log.init_comlog.

cup.unittest.assert_false(val, errmsg='')[source]

val should be False. Assert False otherwise.

cup.unittest.assert_eq(val, exp, errmsg='')[source]

if val != exp, aseert False and print errmsg

cup.unittest.assert_not_eq(val, exp, errmsg='')[source]

assert not equal to

cup.unittest.assert_eq_one(val, array, errmsg='')[source]

assert val equals one of the items in the [array]

cup.unittest.assert_lt(val, exp, errmsg='')[source]

assert_lt, expect val < exp

cup.unittest.assert_gt(val, exp, errmsg='')[source]

assert_gt, expect val > exp

cup.unittest.assert_ge(val, exp, errmsg='')[source]

expect val >= exp

cup.unittest.assert_le(val, exp, errmsg='')[source]

expect val <= exp

cup.unittest.assert_ne(val, exp, errmsg='')[source]

expect val != exp

class cup.unittest.CUTCase(logfile='./test.log', b_logstd=False, b_debug=False)[source]

Bases: object

CUTCase is compatible with nosetests. You can inherit this class and implement your own TestClass.

Notice class method [set_result] will set case status to True/False after executing the case. Then you can get case status in teardown through calling class method [get_result]

get_result()[source]

get case status during case teardown

set_result(b_result)[source]

set case running status

setup()[source]

set up

teardown()[source]
test_run()[source]

test_run function

class cup.unittest.CCaseExecutor[source]

Bases: object

Executor class for executing CUTCase test cases. See the example below

#!/usr/bin/env python

import sys
import os
import logging

import cup
import sb_global

from nfsinit import CClearMan
from nfs import CNfs
from cli import CCli

class TestMyCase(cup.unittest.CUTCase):
    def __init__(self):
        super(self.__class__, self).__init__(
            logfile=sb_global.G_CASE_RUN_LOG,
            b_logstd=False
        )
        cup.log.info( 'Start to run ' + str(__file__ ) )
        self._cli = CCli()
        self._nfs = CNfs()
        self._uniq_strman = cup.util.CGeneratorMan()
        self._clearman = CClearMan()

    def setup(self):
        str_uniq = self._uniq_strman.get_uniqname()
        self._workfolder = os.path.abspath(os.getcwd())                     + '/' + str_uniq
        self._work_remote_folder =                     self._nfs.translate_path_into_remote_under_rw_folder(
            str_uniq
            )
        os.mkdir(self._workfolder)
        self._clearman.addfolder(self._workfolder)

    def _check_filemd5(self, src, dst):
        ret = os.system('/usr/bin/diff --brief %s %s' % (src, dst) )
        cup.unittest.assert_eq( 0, ret )

    def test_run(self):
        #
        # @author: maguannan
        #
        inited_bigfile = sb_global.G_NFS_DISK_RD +                     sb_global.G_TEST_BIGFILE
        bigfile = self._workfolder +                     '/test_big_file_offsite_write_random_write'
        self.tmpfile = sb_global.G_TMP4USE +                     '/test_big_file_offsite_write_random_write'
        os.system( 'cp %s %s' % (inited_bigfile, bigfile) )
        os.system( 'cp %s %s' % (bigfile, self.tmpfile) )
        times = 50
        baseOffsite = 1000
        fp0 = open(bigfile, 'r+b')
        fp1 = open(self.tmpfile, 'rb+')
        for i in xrange( 0, times ):
            fp0.seek(baseOffsite)
            fp1.seek(baseOffsite)
            fp0.write( 'a' * 100 )
            fp1.write( 'a' * 100 )
            baseOffsite += 1000
        fp0.close()
        fp1.close()

        self._check_filemd5(bigfile, self.tmpfile)

    def teardown(self):
        if self.get_result():
            # if case succeeded, do sth
            os.unlink(self.tmpfile)
            self._clearman.teardown()
        else:
            # if case failed, do sth else.
            print 'failed'
        cup.log.info( 'End running ' + str(__file__ ) )

if __name__ == '__main__':
    cup.unittest.CCaseExecutor().runcase(TestMyCase())
classmethod runcase(case)[source]

run the case

cup.unittest.expect_raise(function, exception, *argc, **kwargs)[source]

expect raise exception

cup.version module

desc:CUP version for Programming Use Only.

Module contents

@author Guannan Ma