Source code for cup.net.asyn.context
#!/usr/bin/env python
# -*- coding: utf-8 -*
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
Connection Context for each socket
"""
import copy
import time
import threading
import traceback
try:
import Queue as queue # pylint: disable=F0401
except ImportError:
import queue # pylint: disable=F0401
import cup
from cup import log
from cup.util import misc
from cup.net.asyn import msg as async_msg
__all__ = [
'CConnContext'
]
[docs]class CConnContext(object): # pylint: disable=R0902
"""
connection context for each socket
"""
CONTEXT_QUEUE_SIZE = 200
def __init__(self):
self._destroying = False
self._sock = None
self._peerinfo = None
self._sending_msg = None
self._send_queue = queue.PriorityQueue(self.CONTEXT_QUEUE_SIZE)
self._recving_msg = None
self._recv_queue = queue.PriorityQueue(self.CONTEXT_QUEUE_SIZE)
self._msgind_in_sendque = 0
self._is_reading = None
self._is_1st_recv_msg = True
self._is_1st_send_msg = True
self._conn = None
self._lock = threading.Lock()
self._readlock = threading.Lock()
self._writelock = threading.Lock()
self._retry_interval = None
self._total_timeout = None
self._last_retry_time = None
self._function = None
self._resend_flag = None
self._listened_peer = None
[docs] def to_destroy(self):
"""
destroy context
"""
self._lock.acquire()
self._destroying = True
if self._sock is None:
msg = 'context is with no sock'
else:
msg = 'context with socket: {0}, peer:{1}'.format(
self._sock, self.get_peerinfo())
log.debug('({0}) is to be destroyed'.format(msg))
self._lock.release()
[docs] def is_detroying(self):
"""
is context being destroyed
"""
self._lock.acquire()
is_destryoing = self._destroying
self._lock.release()
return is_destryoing
[docs] def set_destoryed(self):
"""set context to destroyed status"""
self._lock.acquire()
self._destroying = False
self._lock.release()
[docs] def set_conn_man(self, conn):
"""
set conn for context
"""
self._conn = conn
[docs] def set_sock(self, sock):
"""
associate socket
"""
self._lock.acquire()
self._sock = copy.copy(sock)
self._lock.release()
[docs] def get_sock(self):
"""
return associated socket
"""
misc.check_not_none(self._sock)
sock = self._sock
return sock
[docs] def do_recv_data(self, data, data_len):
"""
push data into the recving_msg queue
network read should be in 1 thread only.
"""
if self._recving_msg is None:
raise cup.err.NotInitialized('self._recving_msg')
try:
ret = self._recving_msg.push_data(data)
except IndexError as error:
log.warn('index error/msg len error happened:{0}'.format(error))
log.warn(traceback.format_exc())
log.warn('receive a msg that cannot handle, close the socket')
self.to_destroy()
return
if ret < 0:
log.warn(
'receive an wrong socket msg, to close the peer:{0}'.format(
self.get_peerinfo()
)
)
self.to_destroy()
self._conn.cleanup_error_context(self)
return
if data_len >= ret:
if self._recving_msg.is_recvmsg_complete():
self._is_1st_recv_msg = False
self._conn.get_recv_queue().put(
(self._recving_msg.get_flag(), self._recving_msg)
)
if self.get_listened_peer() is None:
listened_peer = self._recving_msg.get_from_addr()[0]
self.set_listened_peer(listened_peer)
log.info(
'set listened peer {0} for this context({1})'.format(
listened_peer, self._peerinfo)
)
self._recving_msg = None
if self._conn.get_recv_queue().qsize() >= 500:
time.sleep(0.1)
self.move2recving_msg()
# the pushed data should span on two msg datas
if data_len > ret:
return self.do_recv_data(data[ret:], (data_len - ret))
else:
log.error(
'Socket error. We cannot get more than pushed data length'
)
assert False
return
[docs] def move2recving_msg(self):
"""
get the net msg being received
"""
# if no recving msg pending there, create one.
if self._recving_msg is None:
self._recving_msg = async_msg.CNetMsg(is_postmsg=False)
self._recving_msg.set_msg_context(self)
if self._is_1st_recv_msg:
self._recving_msg.set_need_head(True)
else:
self._recving_msg.set_need_head(False)
[docs] def try_move2next_sending_msg(self):
"""
move to next msg that will be sent
"""
if self._sending_msg is None or \
self._sending_msg.is_msg_already_sent():
try:
item = self._send_queue.get_nowait()
msg = item[2]
except queue.Empty:
# log.debug('The send queue is empty')
msg = None
except Exception as error:
errmsg = (
'Catch a error that I cannot handle, err_msg:%s' %
str(error)
)
log.error(errmsg)
raise ValueError(errmsg)
self._sending_msg = msg
else:
log.debug(
'No need to move to next msg since the current one'
'is not sent out yet'
)
temp = self._sending_msg
return temp
[docs] def put_msg(self, flag, msg):
"""
Put msg into the sending queue.
:param flag:
flag determines the priority of the msg.
Msg with higher priority will have bigger chance to be
sent out soon.
:param return:
return 0 on success
return 1 on TRY_AGAIN ---- queue is full. network is too busy.
:TODO:
If the msg queue is too big, consider close the network link
"""
succ = None
self._lock.acquire()
if self._is_1st_send_msg:
msg.set_need_head(True)
# pylint: disable=W0212
msg._set_msg_len()
self._is_1st_send_msg = False
else:
msg.set_need_head(False)
msg._set_msg_len()
urgency = 1
is_urgent = flag & async_msg.MSG_FLAG2NUM['FLAG_URGENT']
if is_urgent == async_msg.MSG_FLAG2NUM['FLAG_URGENT']:
urgency = 0
try:
self._send_queue.put_nowait((urgency, self._msgind_in_sendque, msg))
self._msgind_in_sendque += 1
succ = 0
except queue.Full:
log.debug(
'network is busy. send_msg_queue is full, peerinfo:{0}'.format(
msg.get_to_addr()[0]
)
)
succ = 1
self._lock.release()
return succ
[docs] def get_context_info(self):
"""
get context info
"""
peerinfo = self.get_peerinfo()
msg = 'Peer socket(%s:%s).' % (peerinfo[0], peerinfo[1])
return msg
[docs] def set_reading(self, is_reading):
"""
set reading status
"""
self._lock.acquire()
self._is_reading = is_reading
self._lock.release()
[docs] def is_reading(self):
"""
get if it is reading
"""
self._lock.acquire()
is_reading = self._is_reading
self._lock.release()
return is_reading
[docs] def try_readlock(self):
"""
try to acquire readlock
:return:
True if succeed. False, otherwise
"""
return self._readlock.acquire(False)
[docs] def release_readlock(self):
"""
release the readlock
"""
self._readlock.release()
[docs] def try_writelock(self):
"""
:return:
True if succeed. False, otherwise
"""
return self._writelock.acquire(False)
[docs] def release_writelock(self):
"""
release the writelock
"""
self._writelock.release()
[docs] def set_peerinfo(self, peer):
"""
set peerinfo
"""
if type(peer) != tuple:
raise ValueError('peer is not a tuple')
self._peerinfo = peer
[docs] def get_peerinfo(self):
"""
get peerinfo
"""
return self._peerinfo
[docs] def get_listened_peer(self):
"""
return peer listened peer info
"""
return self._listened_peer
[docs] def set_listened_peer(self, peer):
"""
set peer listened peer
"""
self._listened_peer = peer
[docs] def get_sending_queue(self):
"""return sending queue"""
return self._send_queue
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent