#!/usr/bin/python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Minghao Zhao, Liu Xuan, Guannan Ma
"""
:desc:
unittest module
"""
from __future__ import print_function
import os
import sys
import hashlib
import traceback
import logging
from cup import log
from cup import err
__all__ = [
'assert_true',
'assert_false',
'assert_eq',
'assert_not_eq',
'assert_eq_one',
'assert_lt',
'assert_gt',
'assert_ge',
'assert_le',
'assert_ne',
'CUTCase',
'CCaseExecutor',
'expect_raise'
]
def _assert_bool(val, exp, errmsg=''):
"""assert bool, val should be exp (either True or False)"""
if val is not exp:
msg = 'got {0}, expect {1}\nUser ErrMsg: {2}'.format(val, exp, errmsg)
try:
log.backtrace_critical(msg, 2)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
[docs]def assert_true(val, errmsg=''):
"""
If val is not True, assert False and print to stdout.
Plz notice, log.critical(errmsg) will be invoked if logging system has
been initialized with cup.log.init_comlog.
"""
if not isinstance(val, bool):
raise TypeError('The type of val is not bool')
_assert_bool(val, True, errmsg)
[docs]def assert_false(val, errmsg=''):
"""
val should be False. Assert False otherwise.
"""
if not isinstance(val, bool):
raise TypeError('The type of val is not bool')
_assert_bool(val, False, errmsg)
[docs]def assert_eq(val, exp, errmsg=''):
"""
if val != exp, aseert False and print errmsg
"""
if val != exp:
msg = 'got {0}, expect {1}\nUser ErrMsg: {2}'.format(val, exp, errmsg)
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
[docs]def assert_not_eq(val, exp, errmsg=''):
"""
assert not equal to
"""
if val == exp:
msg = 'got {0} which is equal, expect not equal \n'\
'User ErrMsg: {1}'.format(val, errmsg)
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
[docs]def assert_eq_one(val, array, errmsg=''):
"""
assert val equals one of the items in the [array]
"""
equal = False
str_arr = ''
for i in array:
str_arr += '|' + str(i) + '|'
if i == val:
equal = True
break
if not equal:
msg = 'got {0}, expect one in the array: {1}\nUser ErrMsg: {2}'.format(
val, str_arr, errmsg
)
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
def assert_in(val, array, errmsg=''):
"""
same to assert_eq_one, for backward compatibility
"""
assert_eq_one(val, array, errmsg)
def assert_not_in(val, iteratables, errmsg=''):
"""
assert val not equal any item in [iteratables (e.g. a list)]
"""
if val in iteratables:
assert False, 'val :%s in iteratables. ErrMsg:%s' % (val, errmsg)
[docs]def assert_lt(val, exp, errmsg=''):
"""
assert_lt, expect val < exp
"""
if val >= exp:
msg = 'got {0}, expect less than {1}\nUser ErrMsg:{2}'.format(
val, exp, errmsg
)
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
[docs]def assert_gt(val, exp, errmsg=''):
"""
assert_gt, expect val > exp
"""
if val <= exp:
msg = 'got {0}, expect greater than {1}\nUser ErrMsg: {2}'.format(
val, exp, errmsg
)
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
[docs]def assert_ge(val, exp, errmsg=''):
"""
expect val >= exp
"""
if val < exp:
msg = ('got {0}, expect greater than (or equal to) {1}\n'\
'User ErrMsg:{2}'.format(val, exp, errmsg))
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
[docs]def assert_le(val, exp, errmsg=''):
"""
expect val <= exp
"""
if val > exp:
msg = 'got {0}, expect less than (or equal to) {1}\n'\
'User ErrMsg:{2}'.format(val, exp, errmsg)
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, msg
[docs]def assert_ne(val, exp, errmsg=''):
"""
expect val != exp
"""
if val == exp:
msg = 'Expect non-equal, got two equal values'\
'{0}:{1}\nUser Errmsg:{2}'.format(val, exp, errmsg)
try:
log.backtrace_critical(msg, 1)
# pylint: disable=W0703
except Exception:
pass
assert False, errmsg
def assert_boundary(val, low, high, errmsg=None):
"""
expect low <= val <= high
"""
if val < low:
msg = 'Expect low <= val <= high, '\
'but val:{0} < low:{1}, msg:{2}'.format(val, low, errmsg)
assert False, msg
if val > high:
msg = 'Expect low <= val <= high, but val:%s > high:%s, msg:%s' % (
val, high, errmsg
)
assert False, msg
def _get_md5_hex(src_file):
"""get md5 hext string of a file"""
with open(src_file, 'rb') as fhandle:
md5obj = hashlib.md5()
while True:
strtmp = fhandle.read(131072) # read 128k
if len(strtmp) <= 0:
break
md5obj.update(strtmp.encode('utf-8'))
return md5obj.hexdigest()
def assert_local_file_eq(srcfile, dstfile, errmsg=None):
"""
expect same md5 value of the two files
"""
assert os.path.exists(srcfile)
assert os.path.isfile(srcfile)
assert os.path.exists(dstfile)
assert os.path.isfile(dstfile)
srcmd5 = _get_md5_hex(srcfile)
dstmd5 = _get_md5_hex(dstfile)
msg = 'expect same md5 value. src:%s, dst:%s, errmsg:%s' % (
srcmd5, dstmd5, errmsg
)
assert srcmd5 == dstmd5, msg
def assert_startswith(source, comp, errmsg=None):
"""
if source does NOT start with comp, assert False
"""
errmsg = 'expect source:{0} starts with:{1}'.format(source, comp)
if not source.startswith(comp):
assert False, errmsg
def assert_none(source):
"""
assert None
"""
if source is not None:
assert False, 'expect source is None, but now is {0}'.format(source)
[docs]class CUTCase(object):
"""
CUTCase is compatible with nosetests. You can inherit this class
and implement your own TestClass.
Notice class method [set_result] will set case status to True/False after
executing the case. Then you can get case status in teardown through
calling class method [get_result]
"""
def __init__(self, logfile='./test.log', b_logstd=False, b_debug=False):
"""
:param logfile:
will invoke log.init_comlog to intialize logging in order to
support invoking logging functions log.info/debug/warn
:param b_logstd:
print to stdout or not
:param b_debug:
enable debug mode or not.
If enabled, log level will be set to log.DEBUG.
log.INFO (log level) will be set by default.
"""
self._result = False
if b_debug:
debuglevel = logging.DEBUG
else:
debuglevel = logging.INFO
log.init_comlog(
'test_case', debuglevel,
logfile, log.ROTATION, 5242880, b_logstd
)
[docs] def setup(self):
"""
set up
"""
pass
[docs] def test_run(self):
"""
test_run function
"""
pass
[docs] def set_result(self, b_result):
"""
set case running status
"""
self._result = b_result
[docs] def get_result(self):
"""
get case status during case teardown
"""
return self._result
[docs] def teardown(self):
"""
teardown
"""
pass
# pylint: disable=R0903
[docs]class CCaseExecutor(object):
"""
Executor class for executing CUTCase test cases. See the example below
::
#!/usr/bin/env python
import sys
import os
import logging
import cup
import sb_global
from nfsinit import CClearMan
from nfs import CNfs
from cli import CCli
class TestMyCase(cup.unittest.CUTCase):
def __init__(self):
super(self.__class__, self).__init__(
logfile=sb_global.G_CASE_RUN_LOG,
b_logstd=False
)
cup.log.info( 'Start to run ' + str(__file__ ) )
self._cli = CCli()
self._nfs = CNfs()
self._uniq_strman = cup.util.CGeneratorMan()
self._clearman = CClearMan()
def setup(self):
str_uniq = self._uniq_strman.get_uniqname()
self._workfolder = os.path.abspath(os.getcwd()) \
+ '/' + str_uniq
self._work_remote_folder = \
self._nfs.translate_path_into_remote_under_rw_folder(
str_uniq
)
os.mkdir(self._workfolder)
self._clearman.addfolder(self._workfolder)
def _check_filemd5(self, src, dst):
ret = os.system('/usr/bin/diff --brief %s %s' % (src, dst) )
cup.unittest.assert_eq( 0, ret )
def test_run(self):
#
# @author: maguannan
#
inited_bigfile = sb_global.G_NFS_DISK_RD + \
sb_global.G_TEST_BIGFILE
bigfile = self._workfolder + \
'/test_big_file_offsite_write_random_write'
self.tmpfile = sb_global.G_TMP4USE + \
'/test_big_file_offsite_write_random_write'
os.system( 'cp %s %s' % (inited_bigfile, bigfile) )
os.system( 'cp %s %s' % (bigfile, self.tmpfile) )
times = 50
baseOffsite = 1000
fp0 = open(bigfile, 'r+b')
fp1 = open(self.tmpfile, 'rb+')
for i in range( 0, times ):
fp0.seek(baseOffsite)
fp1.seek(baseOffsite)
fp0.write( 'a' * 100 )
fp1.write( 'a' * 100 )
baseOffsite += 1000
fp0.close()
fp1.close()
self._check_filemd5(bigfile, self.tmpfile)
def teardown(self):
if self.get_result():
# if case succeeded, do sth
os.unlink(self.tmpfile)
self._clearman.teardown()
else:
# if case failed, do sth else.
print 'failed'
cup.log.info( 'End running ' + str(__file__ ) )
if __name__ == '__main__':
cup.unittest.CCaseExecutor().runcase(TestMyCase())
"""
[docs] @classmethod
def runcase(cls, case):
"""
run the case
"""
failed = False
try:
case.setup()
case.test_run()
case.set_result(True)
# pylint: disable=W0703
except Exception:
print(traceback.format_exc())
case.set_result(False)
failed = True
# case.teardown()
if failed:
try:
case.fail_teardown()
# pylint: disable=W0703
except Exception:
pass
print('========================')
print('======== Failed ========')
print('========================')
sys.exit(-1)
case.teardown()
print('========================')
print('======== Passed ========')
print('========================')
[docs]def expect_raise(function, exception, *argc, **kwargs):
"""expect raise exception"""
try:
function(*argc, **kwargs)
raise err.ExpectFailure(exception, None)
except exception:
pass
else:
raise err.ExpectFailure(exception, None)
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent