Source code for cup.services.serializer

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
    serilizers including local file serilizer
"""
import os
import time
import traceback
import threading
import collections

from cup import log
from cup import exfile

__all__ = [
    'LOGFILE_GOOD', 'LOGFILE_EOF', 'LOGFILE_BAD_RECORD', 'LOG_MOD_ADD',
    'LOG_MOD_SET', 'LOG_MOD_DEL', 'LogRecord', 'LocalFileSerilizer'
]

LOGFILE_GOOD = 0
LOGFILE_EOF = 1
LOGFILE_BAD_RECORD = 2

LOG_MOD_ADD = 0
LOG_MOD_SET = 1
LOG_MOD_DEL = 3

LogRecord = collections.namedtuple(
    'LogRecord', ['length', 'log_id', 'log_type', 'log_mode', 'log_binary']
)


class BaseSerilizer(object):
    """base serilizer for agent"""
    def __init__(self):
        """pass"""

    def add_log(self, log_type, log_mode, log_binary):
        """add one log"""

    def load_logs(self, record_num):
        """pass"""

    @classmethod
    def asign_uint2byte_bybits(cls, num, bits):
        """uint to byte"""
        asign_len = bits / 8
        tmp = ''
        i = 0
        while True:
            quotient = int(num / 256)
            remainder = num % 256
            tmp += chr(remainder)
            if quotient < 256:
                tmp += chr(quotient)
                break
            else:
                num = quotient
            i += 1
        length = len(tmp)
        if length < asign_len:
            for _ in range(0, asign_len - length):
                tmp += chr(0)
        return tmp

    @classmethod
    def convert_bytes2uint(cls, str_data):
        """convert bytes into uint"""
        num = 0
        b_ind = 0
        for i in str_data:
            num += pow(256, b_ind) * ord(i)
            b_ind += 1
        return num


MsgPostRevision = collections.namedtuple(
    'MsgPostRevision', ['hostkey', 'revision_id', 'is_post']
)


[docs]class LocalFileSerilizer(BaseSerilizer): """ local file serilizer""" def __init__( self, storage_dir, skip_badlog=False, max_logfile_size=1024 * 1024, persist_after_sec=10 * 60 ): """ :param skip_badlog: Attention. Plz use this parameter very carefully. It will skip bad records which means you have high chance losing data!!! :param persist_after_sec: """ BaseSerilizer.__init__(self) self._skip_badlog = skip_badlog self._storage = os.path.abspath(storage_dir) self._logid = 0 self._max_log_file_size = max_logfile_size self._current_filesize = 0 self._writestream = None self._load_stream = None self._current_loadfile = None self._buffer_files = None self._persist_sec = persist_after_sec self._record_lenbytes = 4 self._loglist_stream = None self._loglist_switching = False self._logfile_list = '{0}/logfile.list'.format(storage_dir) self._logfile_listnew = '{0}.new'.format(self._logfile_list) self._logfile_listold = '{0}.old'.format(self._logfile_list) self._loglist_switch = '{0}.switch'.format(self._logfile_list) self._mlock = threading.Lock() self._name = self.__class__
[docs] def set_name(self, name): """set a name of str for the serializer""" self._name = 'serilizer: ({0})'.format(name)
@classmethod def __cmp_logfile_id(cls, first, second): """compare first and second""" key_first, value_first = first.split('.') key_second, value_second = second.split('.') if key_first < key_second: return -1 elif key_first > key_second: return 1 elif key_first == key_second: if int(value_first) <= int(value_second): return -1 else: return 1 def _get_storage_dir(self, logid=-1, logtype=None, logmode=None): """get storage dir""" folder = '{0}/{1}'.format(self._storage, self.get_subdir()) return folder def _recover_from_lastwriting(self, truncate_last_failure=True): """ recovery from last log writing :raise Exception: IOError, if any error happened """ folder = self._get_storage_dir() files = self._get_ordered_logfiles(folder) need_finish_file = False if len(files) < 1: log.info('no need recovery. Does not contain any files') return file_start_logid = -1 seek_file = None if files[-1].find('writing') < 0: # does not need recovery log.info('does not have unfinished logfile, return') file_start_logid = int(files[-1].split('.')[-1]) + 1 seek_file = files[-1] log.info('next logid will be {0}'.format(self._logid)) else: # need recovery, checkup, must <= 0 writing log file count = 0 for fname in files: if fname.find('writing') >= 0: count += 1 if count > 1: errmsg = 'has more than 1 writing log files, count:{0}' log.error(errmsg.format(count)) raise IOError(errmsg) log.info('has not finished log file, recovery start') seek_file = files[-1] file_start_logid = int(seek_file.split('.')[-1]) fullname = os.path.normpath('{0}/{1}'.format(folder, seek_file)) done_id = file_start_logid read_length = 0 try: reader = open(fullname, 'r') while True: ret, _ = self._try_read_one_log(reader) if ret == LOGFILE_BAD_RECORD: if truncate_last_failure: log.info( 'to truncate last writing file:{0}' ', size:{1}'.format( files[-1], read_length) ) reader.close() reader = open(fullname, 'w') reader.truncate(read_length) reader.close() reader = open(fullname, 'r') break else: raise IOError( 'truncate_last_failure is not enabled &&' ' Bad record found' ) elif ret == LOGFILE_EOF: log.info('log file is intact, no need truncating') break elif ret == LOGFILE_GOOD: done_id += 1 read_length = reader.tell() newname = os.path.normpath('{0}/{1}'.format( folder, 'done.{0}'.format(file_start_logid)) ) if fullname != newname: log.info('move last writing file {0} to {1}'.format( fullname, newname) ) os.rename(fullname, newname) self._loglist_stream.write('{0}\n'.format(newname)) self._loglist_stream.flush() os.fsync(self._loglist_stream.fileno()) self._logid = done_id log.info('next logid will be {0}'.format(self._logid)) except Exception as err: log.error('failed to recover from last log:{0}'.format(err)) raise IOError(err) return def __del__(self): try: if self._writestream is not None: self._writestream.close() if self._load_stream is not None: self._load_stream.close() # pylint:disable=W0703 except Exception: pass def _stream_close(self): """close the current stream""" self._writestream.close() def _stream_wbopen(self, fname): """open new stream""" ret = False try: parent = os.path.dirname(fname) if not os.path.exists(parent): os.makedirs(parent) self._writestream = open(fname, 'w+b') log.debug('open new stream succeed') ret = True except IOError as err: log.error( 'IOError, failed to open stream, err:{0}, file:{1}'.format( err, fname ) ) except OSError as err: log.error( 'OSError, failed to open stream, err:{0}, file:{1}'.format( err, fname ) ) return ret
[docs] def is_stream_wbopen(self): """is stream open""" if self._writestream is None: return False else: return True
def _get_next_logfile(self, logid): """get current logfile""" folder = self._get_storage_dir(logid=logid) fname = '{0}/writing.{1}'.format(folder, logid) return fname
[docs] def get_subdir(self, log_id=-1): """get log dir""" return "0"
def _get_ordered_logfiles(self, folder): """get log files in order""" try: files = sorted(os.listdir(folder), cmp=self.__cmp_logfile_id) except TypeError: import functools files = sorted(os.listdir(folder), key=functools.cmp_to_key( self.__cmp_logfile_id) ) retfiles = [] for item in files: if any([ len(item.split('.')) != 2, item.find('done.') < 0 and item.find('writing.') < 0 ]): log.info('file name {0} invalid, will skip'.format(item)) continue retfiles.append(item) return retfiles
[docs] def set_current_logid(self, logid): """reset current log id""" if logid < 0: raise ValueError('cannot setup logid less than 0') self._logid = logid fname = self._get_next_logfile(self._logid) if not self._stream_wbopen(fname): log.error('failed to open stream, return False') return False log.info('reset current log id to {0}'.format(logid)) return True
def _write_data(self, binary): """ write data into the local file""" try: self._writestream.write(binary) self._writestream.flush() os.fsync(self._writestream.fileno()) self._logid += 1 # pylint: disable=W0703 # need to catch such general exeception except Exception as error: log.error( 'failed to write data into LocalFileSerilizer, err:{0}'.format( error) ) return False return True def _check_need_new_logfile(self): """if need new log file""" if os.path.exists(self._loglist_switch) and \ (not self._loglist_switching): # start to switch log.info('{0} loglist start to switch'.format(self._name)) self._loglist_switching = True self._loglist_stream.write('NEED_SWITCH_LOCALFILE\n') self._loglist_stream.flush() os.fsync(self._loglist_stream) self._loglist_stream.close() if not os.path.exists(self._logfile_listnew): try: exfile.mk_newnode(self._logfile_listnew) # pylint: disable=W0703 except Exception as err: log.error('switch loglist file failed:{0}'.format(err)) return False log.info('{0} loglist file {1} switched'.format( self._name, self._logfile_list) ) os.rename(self._logfile_list, self._logfile_listold) os.rename(self._logfile_listnew, self._logfile_list) self._loglist_stream = open(self._logfile_list, 'a') self._loglist_switching = False if self._current_filesize >= self._max_log_file_size: # log.info('serilizer file needs moving to a new one') last_logid = self._writestream.name.split('.')[-1] newname = os.path.normpath('{0}/done.{1}'.format( os.path.dirname(self._writestream.name), last_logid )) log.info( 'finish one log file, logid, range:{0}-{1}'.format( last_logid, self._logid - 1 ) ) os.rename(self._writestream.name, newname) self._stream_close() self._loglist_stream.write('{0}\n'.format(newname)) self._loglist_stream.flush() os.fsync(self._loglist_stream.fileno()) self._current_filesize = 0 fname = self._get_next_logfile(self._logid) if not self._stream_wbopen(fname): return False return True
[docs] def is_empty(self): """return if there is no log""" folder = self._get_storage_dir() files = self._get_ordered_logfiles(folder) if len(files) < 1: return True else: return False
[docs] def switch_logfilelist(self): """switch logfile to logfile.old""" log.info('serializer ({0}) to swtich loglist'.format(self._name)) if not os.path.exists(self._loglist_switch): exfile.mk_newnode(self._loglist_switch)
[docs] def add_log(self, log_type, log_mode, log_binary): """add log into the local file :return: a tuple (result_True_or_False, logid_or_None) """ self._mlock.acquire() ret = (True, None) if not self.is_stream_wbopen(): fname = self._get_next_logfile(self._logid) if not self._stream_wbopen(fname): ret = (False, None) if ret[0]: # binary := # 32bit len | 128bit logid | log_type 16bit | log_mode 16bit| bin bin_logid = self.asign_uint2byte_bybits(self._logid, 128) bin_type = self.asign_uint2byte_bybits(log_type, 16) bin_mode = self.asign_uint2byte_bybits(log_mode, 16) data = '{0}{1}{2}{3}'.format( bin_logid, bin_type, bin_mode, log_binary ) data_len = len(data) str_data_len = self.asign_uint2byte_bybits( data_len, self._record_lenbytes * 8) write_data = '{0}{1}'.format(str_data_len, data) log.info('{0} add_log, type {1} mode {2}, logid {3}, ' 'datelen:{4}'.format( self._name, log_type, log_mode, self._logid, data_len) ) if self._write_data(write_data): self._current_filesize += (data_len + len(str_data_len)) if not self._check_need_new_logfile(): ret = (False, None) else: ret = (True, self._logid) else: log.error('failed to add_log(type:{} mode {}'.format( log_type, log_mode) ) ret = (False, None) self._mlock.release() return ret
[docs] def purge_data(self, before_logid): """ log files which contains log (less than before_logid) will be purged. """ folder = self._get_storage_dir() logfiles = self._get_ordered_logfiles(folder) last_logid = None last_fname = None purge_list = [] for fname in logfiles: if fname.find('writing') >= 0: continue current = int(fname.split('.')[-1]) if last_logid is not None and (current - 1) < before_logid: purge_list.append(last_fname) last_fname = fname last_logid = current log.info('log id < before_logid will be purged:purged:{0}'.format( purge_list) ) ind = 0 for fname in purge_list: full = '{0}/{1}'.format(folder, fname) log.info('to purge log file:{0}'.format(full)) try: os.remove(full) ind += 1 except StandardError as err: log.error( 'failed to purge log file:{0}, {1}'.format(full, err) ) if ind % 1000: time.sleep(0.1) log.info( '{0} purge data finished, to switch loglist'.format(self._name) ) self.switch_logfilelist()
def _do_open4read(self, start_logid=-1): """ get open load stream :TODO: read starting from logid """ load_dir = self._get_storage_dir(logid=self._logid) self._buffer_files = self._get_ordered_logfiles(load_dir) to_open = None if len(self._buffer_files) <= 0: log.warn('does not have any log record yet') else: if -1 == start_logid: to_open = self._buffer_files[0] else: pass # if self._load_stream is None: # log.error('load stream should not be None') # return False # name = os.path.basename(self._load_stream.name) # ind = -1 # try: # ind = self._buffer_files.index(name) # except ValueError: # log.error('does not find the log: {0}'.format(name)) # return False # to_open = self._buffer_files[ind] try: fname = '{0}/{1}'.format(load_dir, to_open) self._load_stream = open(fname, 'rb') return True # pylint:disable=W0703 # need such an exception except OSError as err: log.error('failed to open log stream :{0}'.format(err)) return False
[docs] def close_read(self): """close open4read""" if self._load_stream is None: self._load_stream.close()
[docs] def open4read(self): """open logs for read""" if self._load_stream is None: if not self._do_open4read(): log.error('failed to open4read, return') return False else: return True
[docs] def open4write(self, truncate_last_failure=True): """ open4write :raise Exception: if encounter any IOError, will raise IOError(errmsg) """ try: if not os.path.exists(self._logfile_list): exfile.mk_newnode(self._logfile_list) self._loglist_stream = open(self._logfile_list, 'a') except Exception as err: log.error('cannot create loglist, raise IOError') raise IOError('cannot create loglist, {0}'.format(err)) log.info( '{0} try to recover from last ' 'write if there is any need, truncate_last_failure:{1}'.format( self._name, truncate_last_failure) ) self._recover_from_lastwriting(truncate_last_failure)
[docs] def close_write(self): """close the writer""" if self._writestream is not None: self._writestream.close()
def _try_read_one_log(self, stream): """ read one log record from the stream_close. :return: """ if stream is None: return (LOGFILE_EOF, None) str_datalen = datalen = str_data = None try: str_datalen = stream.read(self._record_lenbytes) if len(str_datalen) == 0: return (LOGFILE_EOF, None) if len(str_datalen) < self._record_lenbytes: log.warn('got a bad log from stream:{0}'.format(stream.name)) return (LOGFILE_BAD_RECORD, None) datalen = self.convert_bytes2uint(str_datalen) str_data = stream.read(datalen) if len(str_data) < datalen: log.warn( 'got less than data len from stream:{0}'.format( stream.name) ) return (LOGFILE_BAD_RECORD, None) log_id = self.convert_bytes2uint(str_data[0: 16]) log_type = self.convert_bytes2uint(str_data[16: 16 + 2]) log_mode = self.convert_bytes2uint(str_data[18: 18 + 2]) log_binary = str_data[20:] return ( LOGFILE_GOOD, LogRecord( datalen, log_id, log_type, log_mode, log_binary ) ) except Exception as err: log.error('failed to parse log record:{0}'.format(err)) log.error(traceback.format_exc()) return (LOGFILE_BAD_RECORD, None) def _move2next_load_fname(self): """ get next load fname""" folder = self._get_storage_dir() fname = os.path.basename(self._load_stream.name) files = self._get_ordered_logfiles(folder) length = len(files) ind = -1 try: ind = files.index(fname) except ValueError: log.error('cannot find current log stream:{0}'.format(fname)) return LOGFILE_BAD_RECORD newfile = None if ind < (length - 2): newfile = '{0}/{1}'.format(folder, files[ind + 1]) elif ind == (length - 2): if files[length - 1].find('writing') < 0: newfile = '{0}/{1}'.format(folder, files[length - 1]) else: log.debug('does not have more finished log edits to read') return LOGFILE_EOF elif ind == (length - 1): log.info('does not have more log edits to read, return') return LOGFILE_EOF try: self._load_stream.close() self._load_stream = open(newfile, 'rb') return LOGFILE_GOOD except StandardError as err: log.error('failed to move to next load stream:{0}'.format(newfile)) log.error('err:{0}'.format(err)) return LOGFILE_BAD_RECORD
[docs] def read(self, record_num=128): """ load log into memory :notice: If skip_badlog is not True, will raise IOError if the stream encounters any error. Otherwise, the stream will skip the bad log file, move to next one and continue reading :return: a. return a list of "record_num" of LogRecord. b. If the count number of list is less than record_num, it means the stream encounter EOF, plz read again afterwards. c. If the returned is None, it means the stream got nothing, plz try again. """ recordlist = [] count = 0 move2nextstream = False while count < record_num: ret, retval = self._try_read_one_log(self._load_stream) if ret == LOGFILE_EOF: # need read next log file move2nextstream = True elif ret == LOGFILE_GOOD: recordlist.append(retval) count += 1 continue elif ret == LOGFILE_BAD_RECORD: if not self._skip_badlog: raise IOError( 'find bad records in {0}'.format( self._load_stream.name) ) else: log.warn( 'Bad record! ' 'But skip_badlog is on, will skip the file:{0}'.format( self._load_stream.name) ) move2nextstream = True if move2nextstream: move2nextstream = False ret = self._move2next_load_fname() if LOGFILE_EOF == ret: log.debug('no more log edits to read, plz retry') break elif LOGFILE_GOOD == ret: log.debug('moved to next log edit file, to read new log') continue elif LOGFILE_BAD_RECORD == ret: log.error('IOError happended, read_logs failed') if self._skip_badlog: log.error('skip bad log is on, try moving to next one') move2nextstream = True continue else: raise IOError('encounter bad records, raise exception') return recordlist
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent