cup.net.asyn package

description:

Async Module is a tcp framework for asynchrous network msg tranfering

Submodules

cup.net.asyn.common module

common function module for cup.net.asyn

cup.net.asyn.common.add_future2connaddr(pack, future)[source]

add future into connaddr

cup.net.asyn.common.add_stub2connaddr(pack, stub)[source]

add stub into connaddr

cup.net.asyn.common.get_ip_and_port_connaddr(pack)[source]

get (ip, port) from connaddr

cup.net.asyn.common.getfuture_connaddr(pack)[source]

get future from conaddr

cup.net.asyn.common.getip_connaddr(pack)[source]

get ip from connaddr

cup.net.asyn.common.getport_connaddr(pack)[source]

get port from connaddr

cup.net.asyn.common.getstub_connaddr(pack)[source]

get stub from connaddr

cup.net.asyn.common.ip_port2connaddr(peer)[source]
connaddr is a 64bit int

32 - 16 - 16 - 32 ip - port - stub - future

Parameters:

peer – (ipaddr, port)

Returns:

return a connaddr

cup.net.asyn.conn module

description:

connection related module 1. There’s only 1 thread reading/receiving data from the interface. 2. There might have more than 1 thred writing data into the network

queue. 1 thread per context(ip, port).

Notice that _do_write will only TRY to send out some data. It might encounter TCP/IP stack full of data in the SEND buffer-queue of the network interface

class cup.net.asyn.conn.CConnectionManager(ip, bindport, thdpool_param)[source]

Bases: object

connaddr. Convert ip:port into a 64-bit hex.

NET_RW_SIZE = 131072
exception QueueError(msg)[source]

Bases: Exception

internal queue error for CConnectionManager class

SOCK_ALIVE_PARAMS = {'after_idle_sec': 1, 'interval_sec': 3, 'max_fails': 5}
add_write_job(context)[source]

add network write into queue

bind()[source]

bind the ip:port

cleanup_error_context(context)[source]

clean up error context

close_socket(msg, recv_socket)[source]

close socket by msg

connect(peer)[source]
Parameters:

peer – ip:port

do_check_msg_ack_loop()[source]

check msg ack loop

dump_stats()[source]

dump stats

get_needack_dict()[source]

get neekack dict

get_recv_msg()[source]

get recv msg from queue

get_recv_msg_ind()[source]

get recv msg ind

get_recv_queue()[source]

get recving_msg queue

global_sock_keepalive(after_idle_sec=1, interval_sec=3, max_fails=5)[source]

Set TCP keepalive on an open socket. It activates after 1 second (after_idle_sec) of idleness, then sends a keepalive ping once every 3 seconds (interval_sec), and closes the connection after 5 failed ping (max_fails), or 15 sec Notice, this will set all sockets this way. :param sock:

socket

Parameters:
  • after_idle_sec – for TCP_KEEPIDLE. May not work, depends on ur system

  • interval_sec – for TCP_KEEPINTVL

  • max_fails – for TCP_KEEPCNT

listen_new_connect()[source]
poll()[source]

start to poll

push_msg2needack_queue(msg)[source]

get neekack dict

push_msg2sendqueue(msg)[source]

push msg into the send queue

read(context)[source]

read with conn context

stop(force_stop=False)[source]

stop the connection manager

cup.net.asyn.context module

description:

Connection Context for each socket

class cup.net.asyn.context.CConnContext[source]

Bases: object

connection context for each socket

CONTEXT_QUEUE_SIZE = 200
do_recv_data(data, data_len)[source]

push data into the recving_msg queue network read should be in 1 thread only.

get_context_info()[source]

get context info

get_listened_peer()[source]

return peer listened peer info

get_peerinfo()[source]

get peerinfo

get_sending_queue()[source]

return sending queue

get_sock()[source]

return associated socket

is_detroying()[source]

is context being destroyed

is_reading()[source]

get if it is reading

move2recving_msg()[source]

get the net msg being received

put_msg(flag, msg)[source]

Put msg into the sending queue.

Parameters:
  • flag

    flag determines the priority of the msg.

    Msg with higher priority will have bigger chance to be

    sent out soon.

  • return – return 0 on success return 1 on TRY_AGAIN —- queue is full. network is too busy.

TODO:

If the msg queue is too big, consider close the network link

release_readlock()[source]

release the readlock

release_writelock()[source]

release the writelock

set_conn_man(conn)[source]

set conn for context

set_destoryed()[source]

set context to destroyed status

set_listened_peer(peer)[source]

set peer listened peer

set_peerinfo(peer)[source]

set peerinfo

set_reading(is_reading)[source]

set reading status

set_sock(sock)[source]

associate socket

to_destroy()[source]

destroy context

try_move2next_sending_msg()[source]

move to next msg that will be sent

try_readlock()[source]

try to acquire readlock

Returns:

True if succeed. False, otherwise

try_writelock()[source]
Returns:

True if succeed. False, otherwise

cup.net.asyn.ioloop module

description:

pollers for epoll and kqueue and others. Refer IOLoop from tornado:

Respect to the tornado team:

Tornado is based on Apache V2.0 License. Here it goes:

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class cup.net.asyn.ioloop.BasePoller[source]

Bases: object

abstract fileno()[source]

return fileno of the poller

abstract modify(fd, events)[source]

modify fd to newmode

abstract poll(wait_time)[source]

poll until wait_times passes.

abstract register(fd, events)[source]

register fd

abstract unregister(fd)[source]

unregister fd

class cup.net.asyn.ioloop.Epoller[source]

Bases: BasePoller

epoll for linux and others

fileno()[source]

return fileno of epoll object

modify(fd, events)[source]

modify kqueue events

poll(wait_time)[source]

start to poll

read_params()[source]

epoll read params

register(fd, events)[source]

register events for a fd

unregister(fd)[source]

unregister for epoll

write_params()[source]

epoll write params

class cup.net.asyn.ioloop.KQueuePoller[source]

Bases: BasePoller

kqueue for macos

fileno()[source]

return fileno of kqueue object

kvent_control(fd, events, flags)[source]

kevent control

modify(fd, events)[source]

modify kqueue events

poll(wait_time)[source]

kqueue poll

register(fd, events)[source]

register events for a fd

unregister(fd)[source]

unregister for kevents

class cup.net.asyn.ioloop.PollerFactory[source]

Bases: object

Poller Factory

modify(fd, events)[source]
poll(wait_time)[source]
register(fd, events)[source]
stop()[source]

stop the poller factory

unregister(fd)[source]
class cup.net.asyn.ioloop.SelectPoller[source]

Bases: BasePoller

downgraded to select.select()

modify(fd, events)[source]

modify fd to newmode

poll(timeout)[source]

poll until wait_times passes.

register(fd, events)[source]

register fd

unregister(fd)[source]

unregister fd

cup.net.asyn.msg module

description:

netmsg related module

class cup.net.asyn.msg.CAckMsg(is_postmsg=True)[source]

Bases: CNetMsg

ack msg example

class cup.net.asyn.msg.CNetMsg(is_postmsg=True)[source]

Bases: object

flag: System use only. type: System will use type > 65535. Users will use type <=65535

#head CUP012-3 for building connection #len - uint64 #fromip,port, stub -uint64 #toip,port, stub -uint64 #msg_type -uint32 #uniqid -128bit [64bit ip, port, 64 bit, uniqid] #body -no limit (length:uint64)

MSGTYPE = <cup.net.asyn.msg.CMsgType object>
MSG_FLAG_MAN = <cup.net.asyn.msg.CMsgFlag object>
MSG_SIGN = 'CUP012-3'
add_flag(flag)[source]

add flag into the msg

add_retry_times()[source]

add retry times

get_body(return_unicode=False)[source]

get msg body

Parameters:

return_unicode – False by default, return str (in py2), bytes in py3. Return unicode if return_unicode is True

get_bodylen()[source]

get body length

get_callback_function()[source]

get callback function

get_errmsg()[source]

get errmsg if we encounter errors sending it out

get_flag()[source]

get msg flag

get_from_addr()[source]

get from addr. ((ip, port), (stub, future))

get_last_retry_time()[source]

get last_retry_time

get_msg_context()[source]

get msg context

get_msg_len()[source]

get msg len

get_msg_type()[source]

get msg type

get_order_counts()[source]

get order counts

get_resend_flag()[source]

get msg handle flag

get_retry_interval()[source]

get retry_interval

get_retry_times()[source]

get retry times

get_to_addr()[source]

get to addr

get_total_timeout()[source]

get total_timeout

get_uniq_id()[source]

get unique msg id

get_write_bytes(length)[source]

get write bytes from the msg

is_a_recvmsg()[source]

is a msg being received

is_a_sendmsg()[source]

is a msg being sent

is_msg_already_sent()[source]

is msg already sent

is_recvmsg_complete()[source]

is msg received already

is_sendmsg_complete()[source]

is msg sent complete

is_valid4send(netmsg)[source]

for future use

pre_resend()[source]

set writeindex

push_data(data)[source]

push data into the msg. Return pushed length.

Return -1 if we should shutdown the socket channel.

Raises:

exception – may raise IndexError when coming msg has problems.

seek_write(length_ahead)[source]

seek foreward by length

set_body(body)[source]

set msg body

set_callback_function(function)[source]

set function

set_errmsg(errmsg)[source]

set errmsg when we encounter errors sending it out

set_flag(flag)[source]

set flag for the msg

set_from_addr(ip_port, stub_future)[source]

set msg from addr

set_last_retry_time(last_retry_time)[source]

set last_retry_time

set_msg_context(context)[source]

set up context for this netmsg

set_msg_type(msg_type)[source]

set msg type

set_need_head(b_need=False)[source]
Note:

By default, the msg does not need to have a head unless it’s the first msg that posted/received.

set_resend_flag(handle_flag)[source]

set msg handle flag

set_retry_interval(retry_interval)[source]

set retry_interval

set_retry_times(num)[source]

set msg retry times

set_to_addr(ip_port, stub_future)[source]

set msg to addr

set_total_timeout(total_timeout)[source]

set total_timeout

set_uniq_id(uniq_id)[source]

set msg unique id

cup.net.asyn.msg.netmsg_tostring(netmsg)[source]

get printable netmsg

cup.net.asyn.msgcenter module

descrition:

msg center related module

class cup.net.asyn.msgcenter.IMessageCenter(ip, port, thdpool_param=None, stat_intvl=20)[source]

Bases: object

Message center class

close_socket(msg, recv_socket=True)[source]

close the socket by msg

default_handle(msg)[source]

default handle for msgcenter

dump_stat()[source]

dump message center class

global_sock_keepalive(after_idle_sec=1, interval_sec=3, max_fails=5)[source]

Set TCP keepalive on an open socket. It activates after 1 second (after_idle_sec) of idleness, then sends a keepalive ping once every 3 seconds (interval_sec), and closes the connection after 5 failed ping (max_fails), or 15 sec

Notice, this will set all sockets this way.

Parameters:
  • sock – socket

  • after_idle_sec – for TCP_KEEPIDLE. May not work, depends on ur system

  • interval_sec – for TCP_KEEPINTVL

  • max_fails – for TCP_KEEPCNT

abstract handle(msg)[source]

handle function which should be implemented by sub-class.

is_stopping()[source]

is msg center being stopped

post_msg(msg)[source]

post a net msg

pre_handle(msg, function)[source]

pre_handle. Internal use ONLY. Do NOT call it directly.

run()[source]

run the msgcenter

setup()[source]

setup the message center

stop(force_stop=False)[source]

stop the message center