#!/usr/bin/python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
common log related module
"""
# pylint: disable=unspecified-encoding,broad-except
from __future__ import print_function
__all__ = [
'debug', 'info', 'warning', 'critical',
'init_comlog', 'setloglevel', 'ROTATION', 'INFINITE',
'reinit_comlog', 'parse',
'backtrace_info', 'backtrace_debug', 'backtrace_error',
'backtrace_critical',
'info_if', 'debug_if', 'warn_if', 'error_if', 'critical_if',
# x* functions are for loggers other than logging.root (the global logger)
'xinit_comlog', 'xdebug', 'xinfo', 'xwarn', 'xerror', 'xcritical'
]
import os
import re
import sys
import time
import logging
from logging import handlers
import threading
import collections
import cup
from cup import err
from cup import platforms
ROTATION = 0
INFINITE = 1
ROTATION_COUNTS = 30
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
G_INITED_LOGGER = []
info = logging.info
warn = logging.warning
warning = logging.warning
error = logging.error
debug = logging.debug
critical = logging.critical
LoggerParams = collections.namedtuple('LoggerParams', [
'loglevel', # one of logging.INFO logging.DEBUG logging.xxx levels
'logfile', # valid logfile position, e.g. /home/test/test.log
'logtype', # log.ROTATION log.INFINITE
'maxlogsize', # logsize for one log, in bytes
'bprint_console', # True, logger will printn to stdout, False, otherwise
'gen_wf' # True/False, generate log lines with level >= WARNNING
])
class _Singleton: # pylint: disable=R0903
"""
internal use for logging. Plz use @Singoleton in cup.decorators
"""
_LOCK = threading.Lock()
def __init__(self, cls):
self.__instance = None
self.__cls = cls
def __call__(self, *args, **kwargs):
# pylint: disable=consider-using-with
self._LOCK.acquire()
if self.__instance is None:
self.__instance = self.__cls(*args, **kwargs)
self._LOCK.release()
return self.__instance
# pylint: disable=too-few-public-methods
class _MsgFilter(logging.Filter):
"""
Msg filters by log levels
"""
# pylint: disable= super-init-not-called
def __init__(self, msg_level=logging.WARNING):
self.msg_level = msg_level
def filter(self, record):
if record.levelno >= self.msg_level:
return False
return True
class LogInitializer:
"""
default log initalizer
"""
# def config_filelogger(self,
# logger, loglevel, strlogfile, logtype,
# maxsize, bprint_console, gen_wf=False
# ): # too many arg pylint: disable=R0913
# pylint:disable=too-many-locals
@classmethod
def setup_filelogger(cls, logger, logparams):
"""
config logger
"""
loglevel = logparams.loglevel
strlogfile = logparams.logfile
logtype = logparams.logtype
maxsize = logparams.maxlogsize
bprint_console = logparams.bprint_console
gen_wf = logparams.gen_wf
if not os.path.exists(strlogfile):
try:
if platforms.is_linux():
os.mknod(strlogfile)
else:
# for py2 compatibility use
with open(strlogfile, 'w+') as fhandle:
fhandle.write('\n')
except IOError as errinfo:
raise IOError(
'logfile does not exist. '
'try to create it. but file creation failed'
) from errinfo
logger.setLevel(loglevel)
# '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(message)s'
tznum = time.strftime('%z')
tzkey = time.strftime('%Z')
formatter = logging.Formatter(
fmt='%(levelname)s:\t %(asctime)s {0}({1}) * '
'[%(process)d:%(thread)x] [%(filename)s:%(lineno)s] %(message)s'
.format(tznum, tzkey)
)
if bprint_console:
info('bprint_console enabled, will print to stdout')
streamhandler = logging.StreamHandler()
streamhandler.setLevel(loglevel)
streamhandler.setFormatter(formatter)
logger.addHandler(streamhandler)
fdhandler = None
if logtype == ROTATION:
fdhandler = handlers.RotatingFileHandler(
strlogfile, 'a', maxsize, ROTATION_COUNTS, encoding='utf-8'
)
else:
fdhandler = logging.FileHandler(
strlogfile, 'a', encoding='utf-8'
)
fdhandler.setFormatter(formatter)
fdhandler.setLevel(loglevel)
if gen_wf:
# add .wf handler
file_wf = str(strlogfile) + '.wf'
warn_handler = logging.FileHandler(file_wf, 'a', encoding='utf-8')
warn_handler.setLevel(logging.WARNING)
warn_handler.setFormatter(formatter)
logger.addHandler(warn_handler)
fdhandler.addFilter(_MsgFilter(logging.WARNING))
logger.addHandler(fdhandler)
@classmethod
def proc_thd_id(cls):
"""return proc thread id"""
return '{0}:{1}'.format(
os.getpid(), threading.current_thread().ident
)
@classmethod
def get_codeline(cls, back=0):
"""get code line"""
return sys._getframe(back + 1).f_lineno # traceback pylint:disable=W0212
@classmethod
def get_codefile(cls, back=0):
"""
get code file
"""
# pylint: disable=W0212
# to get code filename
return os.path.basename(sys._getframe(back + 1).f_code.co_filename)
@classmethod
def log_file_func_info(cls, msg, back_trace_len=0):
"""return log traceback info"""
tempmsg = ' * [%s] [%s:%s] ' % (
cls.proc_thd_id(), cls.get_codefile(2 + back_trace_len),
cls.get_codeline(2 + back_trace_len)
)
msg = '{0}{1}'.format(tempmsg, msg)
if platforms.is_py2():
# pylint: disable=undefined-variable
if isinstance(msg, unicode):
return msg
return msg.decode('utf8')
return msg
# pylint: disable=R0903
@_Singleton
class _RootLogerMan:
_instance = None
_rootlogger = None
_b_rotation = False
_logfile = ''
_logtype = ROTATION
_loggername = None
def __init__(self):
pass
def get_rootlogger(self):
"""
get default(root) logger
"""
if self._rootlogger is None:
raise err.LoggerException(
'The Cup logger has not been initalized Yet. '
'Call init_comlog first'
)
return self._rootlogger
def set_rootlogger(self, loggername, logger):
"""
set default(root) logger with a new loggername
"""
if self._rootlogger is not None:
raise err.LoggerException(
"""WARNING!!! The cup logger has been initalized already\
.Plz do NOT init_comlog twice""")
self._rootlogger = logger
self._loggername = loggername
def reset_rootlogger(self, logger):
"""reset root logger"""
global G_INITED_LOGGER
tmplogger = self._rootlogger
while len(tmplogger.handlers) > 0:
tmplogger.removeHandler(tmplogger.handlers[0])
del tmplogger
self._rootlogger = logger
logging.root = logger
def is_initalized(self):
"""
Initialized or not
"""
if self._rootlogger is None:
return False
return True
# too many arg pylint: disable=R0913
[docs]def init_comlog(loggername, loglevel=logging.INFO, logfile='cup.log',
logtype=ROTATION, maxlogsize=1073741824, bprint_console=False,
gen_wf=False):
"""
Initialize your default logger
:param loggername:
Unique logger name for default logging.
:param loglevel:
4 default levels: log.DEBUG log.INFO log.ERROR log.CRITICAL
:param logfile:
log file. Will try to create it if no existence
:param logtype:
Two type candidiates: log.ROTATION and log.INFINITE
log.ROTATION will let logfile switch to a new one (30 files at most).
When logger reaches the 30th logfile, will overwrite from the
oldest to the most recent.
log.INFINITE will write on the logfile infinitely
:param maxlogsize:
maxmum log size with byte
:param b_printcmd:
print to stdout or not?
:param gen_wf:
print log msges with level >= WARNING to file (${logfile}.wf)
*E.g.*
::
import logging
from cup import log
log.init_comlog(
'test',
log.DEBUG,
'/home/work/test/test.log',
log.ROTATION,
1024,
False
)
log.info('test xxx')
log.critical('test critical')
"""
loggerman = _RootLogerMan()
rootloger = logging.getLogger()
if not loggerman.is_initalized():
loggerman.set_rootlogger(loggername, rootloger)
if os.path.exists(logfile) is False:
if platforms.is_linux():
os.mknod(logfile)
else:
with open(logfile, 'w+') as fhandle:
fhandle.write('\n')
elif os.path.isfile(logfile) is False:
raise err.LoggerException(
'The log file exists. But it\'s not regular file'
)
loggerparams = LoggerParams(
loglevel, logfile, logtype, maxlogsize, bprint_console, gen_wf
)
LogInitializer.setup_filelogger(rootloger, loggerparams)
info('-' * 20 + 'Log Initialized Successfully' + '-' * 20)
global G_INITED_LOGGER
G_INITED_LOGGER.append(loggername)
else:
print('[{0}:{1}] init_comlog has been already initalized'.format(
LogInitializer.get_codefile(1), LogInitializer.get_codeline(1)
))
# too many arg pylint: disable=R0913
[docs]def reinit_comlog(loggername, loglevel=logging.INFO, logfile='cup.log',
logtype=ROTATION, maxlogsize=1073741824,
bprint_console=False, gen_wf=False):
"""
reinitialize default root logger for cup logging
:param loggername:
logger name, should be different from the original one
"""
global G_INITED_LOGGER
if loggername in G_INITED_LOGGER:
msg = ('loggername:{0} has been already used!!! Change a name'.format(
loggername))
raise ValueError(msg)
G_INITED_LOGGER.append(loggername)
tmplogger = logging.getLogger(loggername)
if os.path.exists(logfile) is False:
if platforms.is_linux():
os.mknod(logfile)
else:
with open(logfile, 'w+') as fhandle:
fhandle.write('\n')
elif os.path.isfile(logfile) is False:
raise err.LoggerException(
'The log file exists. But it\'s not regular file'
)
loggerman = _RootLogerMan()
loggerparms = LoggerParams(
loglevel, logfile, logtype, maxlogsize, bprint_console, gen_wf
)
LogInitializer.setup_filelogger(tmplogger, loggerparms)
loggerman.reset_rootlogger(tmplogger)
info('-' * 20 + 'Log Reinitialized Successfully' + '-' * 20)
return
def _fail_handle(msg, e):
if platforms.is_py2():
# pylint: disable=undefined-variable
if not isinstance(msg, unicode):
msg = msg.decode('utf8')
print('{0}\nerror:{1}'.format(msg, e))
elif platforms.is_py3():
print('{0}\nerror:{1}'.format(msg, e))
[docs]def backtrace_info(msg, back_trace_len=0):
"""
info with backtrace support
"""
try:
msg = LogInitializer.log_file_func_info(msg, back_trace_len)
loggerman = _RootLogerMan()
loggerman.get_rootlogger().info(msg)
except err.LoggerException:
return
except Exception as errinfo:
_fail_handle(msg, errinfo)
[docs]def backtrace_debug(msg, back_trace_len=0):
"""
debug with backtrace support
"""
try:
msg = LogInitializer.log_file_func_info(msg, back_trace_len)
loggerman = _RootLogerMan()
loggerman.get_rootlogger().debug(msg)
except err.LoggerException:
return
except Exception as errinfo:
_fail_handle(msg, errinfo)
def backtrace_warn(msg, back_trace_len=0):
"""
warning msg with backtrace support
"""
try:
msg = LogInitializer.log_file_func_info(msg, back_trace_len)
loggerman = _RootLogerMan()
loggerman.get_rootlogger().warning(msg)
except err.LoggerException:
return
# pylint: disable=broad-except
except Exception as errinfo:
_fail_handle(msg, errinfo)
[docs]def backtrace_error(msg, back_trace_len=0):
"""
error msg with backtarce support
"""
try:
msg = LogInitializer.log_file_func_info(msg, back_trace_len)
loggerman = _RootLogerMan()
loggerman.get_rootlogger().error(msg)
except err.LoggerException:
return
except Exception as errinfo:
_fail_handle(msg, errinfo)
[docs]def backtrace_critical(msg, back_trace_len=0):
"""
logging.CRITICAL with backtrace support
"""
try:
msg = LogInitializer.log_file_func_info(msg, back_trace_len)
loggerman = _RootLogerMan()
loggerman.get_rootlogger().critical(msg)
except err.LoggerException:
return
# pylint:disable=broad-except
except Exception as errinfo:
_fail_handle(msg, errinfo)
[docs]def setloglevel(logginglevel):
"""
change log level during runtime
::
from cup import log
log.setloglevel(log.DEBUG)
"""
loggerman = _RootLogerMan()
loggerman.get_rootlogger().setLevel(logginglevel)
[docs]def parse(logline):
"""
return a dict if the line is valid.
Otherwise, return None
:raise Exception:
ValueError if logline is invalid
::
dict_info:= {
'loglevel': 'DEBUG',
'date': '2015-10-14',
'time': '16:12:22,924',
'pid': 8808,
'tid': 1111111,
'srcline': 'util.py:33',
'msg': 'this is the log content',
'tznum': 8,
'tzstr': 'CST'
}
"""
content = logline[logline.rfind(']') + 1:].strip()
# content = content[(content.find(']') + 1):]
# content = content[(content.find(']') + 1):].strip()
regex = re.compile('[ \t]+')
items = regex.split(logline)
loglevel, date, time_, timezone, _, pid_tid, src = items[0:7]
pid, tid = pid_tid.strip('[]').split(':')
tznum, tzkey = timezone.strip('+)').split('(')
try:
return {
'loglevel': loglevel.strip(':'),
'date': date,
'time': time_,
'pid': pid,
'tid': tid,
'srcline': src.strip('[]'),
'msg': content,
'tznum': int(tznum),
'tzkey': tzkey
}
# pylint: disable = W0703
except Exception as errinfo:
raise ValueError(errinfo) from errinfo
[docs]def info_if(bol, msg, back_trace_len=1):
"""log msg with info loglevel if bol is true"""
if bol:
info(msg, back_trace_len)
[docs]def error_if(bol, msg, back_trace_len=1):
"""log msg with error loglevel if bol is true"""
if bol:
error(msg, back_trace_len)
[docs]def warn_if(bol, msg, back_trace_len=1):
"""log msg with error loglevel if bol is true"""
if bol:
warn(msg, back_trace_len)
[docs]def critical_if(bol, msg, back_trace_len=1):
"""log msg with critical loglevel if bol is true"""
if bol:
critical(msg, back_trace_len)
[docs]def debug_if(bol, msg, back_trace_len=1):
"""log msg with critical loglevel if bol is true"""
if bol:
debug(msg, back_trace_len)
[docs]def xinit_comlog(loggername, logger_params):
"""
xinit_comlog along with xdebug xinfo xwarn xerror are functions for
different loggers other than logging.root (the global logger).
:param loggername:
loggername, example: http.access,
:param logger_params:
object of LoggerParams
Code example:
::
logparams = log.LoggerParams(
log.DEBUG, 'cup.x.log', log.ROTATION, 100 * 1024 * 1024,
True, True
)
log.xinit_comlog('log.x', logparams)
log.xdebug('log.x', 'xdebug')
log.xinfo('log.x', 'xinfo')
log.xerror('log.x', 'xerror')
logparams = log.LoggerParams(
log.DEBUG, 'cup.y.log', log.ROTATION, 100 * 1024 * 1024,
True, True
)
log.xinit_comlog('log.y', logparams)
log.xdebug('log.y', 'ydebug')
log.xinfo('log.y', 'yinfo')
log.xerror('log.y', 'yerror')
"""
if not isinstance(logger_params, LoggerParams):
raise TypeError('logger_params should be a object of log.LoggerParams')
logger = logging.getLogger(loggername)
LogInitializer.setup_filelogger(logger, logger_params)
[docs]def xdebug(loggername, msg, back_trace_len=1):
"""
:param loggername:
shoule be xinit_comlog before calling xdebug/xinfo/xerror/xcritical
:param msg:
log msg
:back_trace_len:
1 by default, ignore this if you don't need this
"""
logger = logging.getLogger(loggername)
logger.debug(LogInitializer.log_file_func_info(msg, back_trace_len))
[docs]def xinfo(loggername, msg, back_trace_len=1):
"""
:param loggername:
shoule be xinit_comlog before calling xdebug/xinfo/xerror/xcritical
:param msg:
log msg
:back_trace_len:
default 1, just ignore this param if you don't know what it is.
This param will trace back 1 layer and get the
[code_filename:code_lines]
"""
logger = logging.getLogger(loggername)
logger.info(LogInitializer.log_file_func_info(msg, back_trace_len))
[docs]def xwarn(loggername, msg, back_trace_len=1):
"""
:param loggername:
shoule be xinit_comlog before calling xdebug/xinfo/xerror/xcritical
:param msg:
log msg
:back_trace_len:
default 1, just ignore this param if you don't know what it is.
This param will trace back 1 layer and get the
[code_filename:code_lines]
"""
logger = logging.getLogger(loggername)
logger.warning(LogInitializer.log_file_func_info(msg, back_trace_len))
[docs]def xerror(loggername, msg, back_trace_len=1):
"""
:param loggername:
shoule be xinit_comlog before calling xdebug/xinfo/xerror/xcritical
:param msg:
log msg
:back_trace_len:
default 1, just ignore this param if you don't know what it is.
This param will trace back 1 layer and get the
[code_filename:code_lines]
"""
logger = logging.getLogger(loggername)
logger.error(LogInitializer.log_file_func_info(msg, back_trace_len))
[docs]def xcritical(loggername, msg, back_trace_len=1):
"""
:param loggername:
shoule be xinit_comlog before calling xdebug/xinfo/xerror/xcritical
:param msg:
log msg
:back_trace_len:
default 1, just ignore this param if you don't know what it is.
This param will trace back 1 layer and get the
[code_filename:code_lines]
"""
logger = logging.getLogger(loggername)
logger.critical(LogInitializer.log_file_func_info(msg, back_trace_len))
if __name__ == '__main__':
cup.log.debug('中文')
cup.log.init_comlog(
'test', cup.log.DEBUG, './test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.init_comlog(
'test', cup.log.DEBUG, './test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.info('test info')
cup.log.debug('test debug')
cup.log.info('中文'.decode('utf8'))
cup.log.reinit_comlog(
're-test', cup.log.DEBUG, './re.test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.reinit_comlog(
're-test', cup.log.DEBUG, './re.test.log',
cup.log.ROTATION, 102400000, False
)
cup.log.info('re:test info')
cup.log.debug('re:test debug')
cup.log.debug('re:中文')
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent