Source code for cup.net.asyn.msg

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn)
"""
:description:
    netmsg related module
"""
import os

import cup
from cup import log
from cup import platforms
from cup.util import misc
from cup.util import generator
from cup.net.asyn import common


MSG_RESENDING_FLAG = 0
MSG_RESEND_SUCCESS = 1
MSG_TIMEOUT_TO_DELETE = 2
MSG_DELETE_FLAG = 3

PY3_DEFAULT_ENCODING = 'utf8'


__all__ = ['CMsgType', 'CMsgFlag', 'CNetMsg', 'CAckMsg', 'netmsg_tostring']


MSG_TYPE2NUM = {
    'HEART_BEAT': 1,
    'RESOURCE_ACQUIRE': 2,
    'RESOURCE_RELEASE': 3,
    'ACK_OK': 4,
    'ACK_FAILURE': 5,
    'ACK_HEART_BEAT': 6,
    'ACK_CREATE': 7,
    'NEED_ACK': 8
}


MSG_FLAG2NUM = {
    'FLAG_URGENT': 0X00000001,
    'FLAG_NORMAL': 0X00000002,
    'FLAG_NEEDACK':0X00000004,
    'FLAG_ACK': 0X00000008,
}


@cup.decorators.Singleton
class CMsgType(object):
    """
    for netmsg types
    """
    def __init__(self):
        self._type2number = {}
        self._number2type = {}

    def register_types(self, kvs):
        """
        register types
        """
        for key_value in kvs.items():
            self._type2number[key_value[0]] = key_value[1]
            self._number2type[str(key_value[1])] = key_value[0]

    def gettype_bynumber(self, number):
        """
        get type by number
        """
        return self._number2type[str(number)]

    def getnumber_bytype(self, str_type):
        """
        get number by type
        """
        return self._type2number[str_type]


@cup.decorators.Singleton
class CMsgFlag(object):
    """
    msg flag class
    """
    def __init__(self):
        self._flag2number = {}
        self._number2flag = {}

    def register_flags(self, kvs):
        """
        register flags
        """
        for key_value in kvs.keys():
            self._flag2number[key_value[0]] = key_value[1]
            self._number2flag[str(key_value[1])] = key_value[0]

    def getflag_bynumber(self, number):
        """
        get flag by number
        """
        return self._number2flag[str(number)]

    def getnumber_byflag(self, str_flag):
        """
        get number by flag
        """
        return self._flag2number[str_flag]


[docs]class CNetMsg(object): """ CNetMsg flag: System use only. type: System will use type > 65535. Users will use type <=65535 #head CUP012-3 for building connection #len - uint64 #fromip,port, stub -uint64 #toip,port, stub -uint64 #msg_type -uint32 #uniqid -128bit [64bit ip, port, 64 bit, uniqid] #body -no limit (length:uint64) """ # length 8 MSG_SIGN = 'CUP012-3' _ORDER = [ 'head', 'flag', 'len', 'from', 'to', 'type', 'uniq_id', 'body' ] _SIZE_EXCEPT_BODY = 72 _SIZE_EXCEPT_HEAD_BODY = 64 _ORDER_COUNTS = 8 # Default flags MSG_FLAG_MAN = CMsgFlag() MSG_FLAG_MAN.register_flags(MSG_FLAG2NUM) # Default types. MSGTYPE = CMsgType() _SYS_MSG_TYPES = { 'ACK': 65536 } MSGTYPE.register_types(_SYS_MSG_TYPES) def __init__(self, is_postmsg=True): #super(self.__class__, self).__init__() self._ORDER_BYTES = [8, 4, 8, 16, 16, 4, 16, 0] self._is_postmsg = is_postmsg self._need_head = True self._data = {} self._read_order = 0 self._writeindex = 0 self._msg_finish = False self._context = None self._msglen = None self._bodylen = None self._type = None self._uniqid = None self._fromaddr = None self._toaddr = None self._dumpdata = None self._flag = None # self._del_timeout = None self._resend_flag = None self._resend_times = 0 if is_postmsg: self.set_flag( MSG_FLAG2NUM['FLAG_NORMAL'] ) # for CNeedAckMsg self._errmsg = None self._retry_interval = None self._total_timeout = None self._last_retry_time = None self._callback_func = None self._resend_flag = MSG_RESENDING_FLAG def __del__(self): """del the msg""" if 'body' in self._data: del self._data['body'] if self._data is not None: del self._data if self._dumpdata is not None: del self._dumpdata
[docs] def get_order_counts(self): """ get order counts """ return self._ORDER_COUNTS
@classmethod def _asign_uint2byte_bybits(cls, num, bits): asign_len = bits / 8 tmp = b'' i = 0 while True: quotient = int(num / 256) remainder = num % 256 tmp += chr(remainder) if quotient < 256: tmp += chr(quotient) break else: num = quotient i += 1 length = len(tmp) if length < asign_len: for _ in range(0, asign_len - length): tmp += chr(0) return tmp @classmethod def _convert_bytes2uint(cls, str_data): num = 0 b_ind = 0 for i in str_data: num += pow(256, b_ind) * ord(i) b_ind += 1 return num
[docs] def push_data(self, data): """ push data into the msg. Return pushed length. Return -1 if we should shutdown the socket channel. :raise exception: may raise IndexError when coming msg has problems. """ if self._msg_finish: log.warn('The CNetMsg has already been pushed enough data') return 0 if len(data) == 0: log.warn( 'You just pushed into the msg with a zero-length data' ) return 0 if platforms.is_py3(): if isinstance(data, str): data = data.encode(PY3_DEFAULT_ENCODING) sign = True data_ind = 0 data_max = len(data) # log.info('msg data read-order:{0}, context:{1}'.format(self._read_order, # self.get_msg_context().get_context_info())) data_key = self._ORDER[self._read_order] while sign: # One loop handle one data_key until there all the data is handled. try: self._data[data_key] except KeyError: self._data[data_key] = b'' loop_data_max = ( self._ORDER_BYTES[self._read_order] - len(self._data[data_key]) ) if (data_max - data_ind) >= loop_data_max: # can fill up the msg self._data[data_key] += ( data[data_ind: loop_data_max + data_ind] ) data_ind += loop_data_max self._read_order += 1 data_key_info = b'' if data_key == 'head': data_key_info = self._data[data_key] if self._data[data_key] != self.MSG_SIGN: return -1 elif data_key == 'flag': data_key_info = self.get_flag() elif data_key == 'len': total_len = self.get_msg_len() if self._need_head: self._ORDER_BYTES[7] = total_len - self._SIZE_EXCEPT_BODY else: self._ORDER_BYTES[7] = ( total_len - self._SIZE_EXCEPT_HEAD_BODY ) data_key_info = self.get_msg_len() elif data_key == 'from': data_key_info = self.get_from_addr() elif data_key == 'uniq_id': data_key_info = self.get_uniq_id() elif data_key == 'type': data_key_info = self.get_msg_type() elif data_key == 'body': data_key_info = len(self._data['body']) if self._read_order >= self._ORDER_COUNTS: self._msg_finish = True sign = False log.debug( 'congratulations. ' 'This msg({0} {1}) has been filled'.format( self.get_uniq_id(), self.get_msg_context().get_context_info()) ) break data_key = self._ORDER[self._read_order] else: # cannot fill up the msg in this round sign = False push_bytes = data_max - data_ind # self._data[data_key] += data[data_ind: data_max] self._data[data_key] += data[data_ind:] data_ind += push_bytes return data_ind
def _addr2pack(self, ip_port, stub_future): misc.check_type(ip_port, tuple) misc.check_type(stub_future, tuple) pack = common.ip_port2connaddr(ip_port) pack = common.add_stub2connaddr(pack, stub_future[0]) pack = common.add_future2connaddr(pack, stub_future[1]) return pack
[docs] def set_flag(self, flag): """ set flag for the msg """ # misc.check_type(flag, int) self._flag = flag self._data['flag'] = self._asign_uint2byte_bybits(flag, 32)
[docs] def set_resend_flag(self, handle_flag): """ set msg handle flag """ self._resend_flag = handle_flag
[docs] def get_resend_flag(self): """ get msg handle flag """ return self._resend_flag
[docs] def add_flag(self, flag): """add flag into the msg""" self._flag = self._flag | flag self._data['flag'] = self._asign_uint2byte_bybits(self._flag, 32)
[docs] def set_need_head(self, b_need=False): """ :note: By default, the msg does not need to have a head unless it's the first msg that posted/received. """ self._need_head = b_need if b_need: self._read_order = 0 else: self._read_order = 1 if self._is_postmsg and self._need_head: self._data['head'] = self.MSG_SIGN
@classmethod def _check_addr(cls, ip_port, stub_future): ip, port = ip_port stub, future = stub_future misc.check_type(ip, str) def _set_msg_len(self): if self._need_head: size_except_body = self._SIZE_EXCEPT_BODY else: size_except_body = self._SIZE_EXCEPT_HEAD_BODY body_len = len(self._data['body']) self._ORDER_BYTES[7] = body_len self._msglen = body_len + size_except_body self._data['len'] = self._asign_uint2byte_bybits( self._msglen, 64 ) tempstr = b'' for i in range(0, self._ORDER_COUNTS - 1): if i == 0 and (not self._need_head): continue tempstr += self._data[self._ORDER[i]] self._dumpdata = '{0}{1}'.format(tempstr, self._data['body'])
[docs] def set_from_addr(self, ip_port, stub_future): """ set msg from addr """ self._check_addr(ip_port, stub_future) pack = self._addr2pack(ip_port, stub_future) self._data['from'] = self._asign_uint2byte_bybits(pack, 128) self._fromaddr = (ip_port, stub_future)
[docs] def set_to_addr(self, ip_port, stub_future): """ set msg to addr """ self._check_addr(ip_port, stub_future) pack = self._addr2pack(ip_port, stub_future) self._data['to'] = self._asign_uint2byte_bybits(pack, 128) self._toaddr = (ip_port, stub_future)
[docs] def set_msg_type(self, msg_type): """ set msg type """ misc.check_type(msg_type, int) self._data['type'] = self._asign_uint2byte_bybits(msg_type, 32) self._type = msg_type
[docs] def set_uniq_id(self, uniq_id): """ set msg unique id """ # misc.check_type(uniq_id, int) self._data['uniq_id'] = self._asign_uint2byte_bybits(uniq_id, 128) self._uniqid = uniq_id
[docs] def set_body(self, body): """ set msg body """ misc.check_type(body, str) self._data['body'] = body self._bodylen = len(body)
def _pack_toaddr(self, pack): (ip, port) = common.get_ip_and_port_connaddr(pack) stub = common.getstub_connaddr(pack) future = common.getfuture_connaddr(pack) return ((ip, port), (stub, future))
[docs] def get_flag(self): """ get msg flag """ if self._flag is None: self._flag = self._convert_bytes2uint(self._data['flag']) return self._flag
[docs] def get_to_addr(self): """ get to addr """ if self._toaddr is None: pack = self._convert_bytes2uint(self._data['to']) self._toaddr = self._pack_toaddr(pack) return self._toaddr
[docs] def get_from_addr(self): """ get from addr. ((ip, port), (stub, future)) """ if self._fromaddr is None: pack = self._convert_bytes2uint(self._data['from']) self._fromaddr = self._pack_toaddr(pack) return self._fromaddr
[docs] def get_msg_type(self): """ get msg type """ if self._type is None: self._type = self._convert_bytes2uint(self._data['type']) return self._type
[docs] def get_msg_len(self): """ get msg len """ if self._msglen is None: self._msglen = self._convert_bytes2uint(self._data['len']) return self._msglen
[docs] def get_uniq_id(self): """ get unique msg id """ if self._uniqid is None: self._uniqid = self._convert_bytes2uint(self._data['uniq_id']) return self._uniqid
[docs] def get_body(self, return_unicode=False): """ get msg body :param return_unicode: False by default, return str (in py2), bytes in py3. Return unicode if return_unicode is True """ if 'body' not in self._data: raise KeyError('Body not set yet') if not return_unicode: return self._data['body'] else: return self._data['body'].decode(PY3_DEFAULT_ENCODING)
[docs] def get_bodylen(self): """ get body length """ return self._bodylen
[docs] def is_a_sendmsg(self): """ is a msg being sent """ return self._is_postmsg
[docs] def is_a_recvmsg(self): """ is a msg being received """ return (not self._is_postmsg)
[docs] def is_recvmsg_complete(self): """ is msg received already """ if not self._is_postmsg and self._msg_finish: return True else: return False
# the head in self._data should be set before sent. # Thus, the self._data.keys will be 1 less than # self.get_order_counts()
[docs] def is_sendmsg_complete(self): """ is msg sent complete """ if self._need_head: size_except_body = self._SIZE_EXCEPT_BODY else: size_except_body = self._SIZE_EXCEPT_HEAD_BODY if (self._bodylen + size_except_body) == self._msglen: return True else: return False
[docs] def get_write_bytes(self, length): """ get write bytes from the msg """ if length <= 0: return # log.debug( # 'to get {0} write bytes from msg, ' # '_writeindex:{1}, msg total_len: {2}'.format( # length, self._writeindex, len(self._dumpdata) # ) # ) return self._dumpdata[self._writeindex: self._writeindex + length]
[docs] def seek_write(self, length_ahead): """ seek foreward by length """ # log.debug( # 'to seek msg length {0}, now index {1}'.format( # length_ahead, self._writeindex)) self._writeindex += length_ahead if self._writeindex > self.get_msg_len(): raise cup.err.AsyncMsgError( 'You have seek_write out of the msg length' )
[docs] def is_msg_already_sent(self): """ is msg already sent """ if self._writeindex == self.get_msg_len(): return True else: return False
#this function is only used by msg which need to be ack #need ack msg
[docs] def pre_resend(self): """ set writeindex """ self._writeindex = 0 self._msg_finish = False
[docs] def set_errmsg(self, errmsg): """set errmsg when we encounter errors sending it out""" self._errmsg = errmsg
[docs] def get_errmsg(self): """get errmsg if we encounter errors sending it out""" return self._errmsg
[docs] def set_total_timeout(self, total_timeout): """ set total_timeout """ self._total_timeout = total_timeout
[docs] def set_retry_interval(self, retry_interval): """ set retry_interval """ self._retry_interval = retry_interval
[docs] def set_callback_function(self, function): """ set function """ self._callback_func = function
[docs] def set_last_retry_time(self, last_retry_time): """ set last_retry_time """ self._last_retry_time = last_retry_time
[docs] def get_total_timeout(self): """ get total_timeout """ return self._total_timeout
[docs] def get_retry_interval(self): """ get retry_interval """ return self._retry_interval
[docs] def get_callback_function(self): """ get callback function """ return self._callback_func
[docs] def get_last_retry_time(self): """ get last_retry_time """ return self._last_retry_time
[docs] def set_retry_times(self, num): """set msg retry times""" self._resend_times = num
[docs] def add_retry_times(self): """add retry times""" self._resend_times += 1
[docs] def get_retry_times(self): """get retry times""" return self._resend_times
[docs] def set_msg_context(self, context): """ set up context for this netmsg """ self._context = context
[docs] def get_msg_context(self): """ get msg context """ return self._context
[docs] def is_valid4send(self, netmsg): """ for future use """ return (True, None)
# pylint: disable=R0904 class CNeedAckMsg(CNetMsg): """ Class need ack msg """ def __init__(self, retry_interval, total_timeout, function): """ :param function: Whether succeed or not, the framework will invoke the function passed in. """ CNetMsg.__init__(self, is_postmsg=True) self.add_flag(MSG_FLAG2NUM['FLAG_NEEDACK']) self._retry_interval = retry_interval self._total_timeout = total_timeout self._last_retry_time = None self._callback_func = function self._resend_flag = MSG_RESENDING_FLAG # pylint: disable=R0904
[docs]class CAckMsg(CNetMsg): """ ack msg example """ def __init__(self, is_postmsg=True): CNetMsg.__init__(self, is_postmsg) self.add_flag(MSG_FLAG2NUM['FLAG_ACK'])
[docs]def netmsg_tostring(netmsg): """ get printable netmsg """ msg = ( 'netmsg, from {0} to {1}, uniqid {2}, msg_type {3}, flag {4}, ' 'body_len {5}'.format( str(netmsg.get_from_addr()), str(netmsg.get_to_addr()), netmsg.get_uniq_id(), netmsg.get_msg_type(), netmsg.get_flag(), netmsg.get_bodylen() ) ) return msg
if __name__ == '__main__': gen = generator.CycleIDGenerator('127.0.0.1', '5000') gen_id = gen.next_id() str_num = CNetMsg._asign_uint2byte_bybits(gen_id, 128) # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent