#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
pollers for epoll and kqueue and others.
Refer IOLoop from tornado:
https://www.tornadoweb.org/en/branch2.0/_modules/tornado/ioloop.html
Respect to the tornado team:
Tornado is based on Apache V2.0 License. Here it goes:
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
"""
import abc
import select
from cup import log
from cup import const
from cup import platforms
IONEW = 0x0
EPOLLIN = 0x001
EPOLLPRI = 0x002
EPOLLOUT = 0x004
EPOLLERR = 0x008
EPOLLHUP = 0x010
EPOLLRDHUP = 0x2000
WRITE = EPOLLOUT
READ = EPOLLIN
ERROR = EPOLLERR | EPOLLHUP | EPOLLRDHUP
[docs]class BasePoller(object):
__metaclass__ = abc.ABCMeta
[docs] @abc.abstractmethod
def fileno(self):
"""
return fileno of the poller
"""
[docs] @abc.abstractmethod
def register(self, fd, events):
"""
register fd
"""
[docs] @abc.abstractmethod
def unregister(self, fd):
"""
unregister fd
"""
[docs] @abc.abstractmethod
def modify(self, fd, events):
"""
modify fd to newmode
"""
[docs] @abc.abstractmethod
def poll(self, wait_time):
"""
poll until wait_times passes.
"""
[docs]class Epoller(BasePoller):
"""
epoll for linux and others
"""
def __init__(self):
"""
"""
self._epoll = select.epoll()
[docs] def write_params(self):
"""epoll write params"""
return (select.EPOLLET | select.EPOLLOUT | select.EPOLLERR)
[docs] def read_params(self):
"""epoll read params"""
return (select.EPOLLET | select.EPOLLIN | select.EPOLLERR)
[docs] def fileno(self):
"""return fileno of epoll object"""
return self._epoll.fileno()
[docs] def register(self, fd, events):
"""
register events for a fd
"""
events = events | select.EPOLLET | select.EPOLLERR
self._epoll.register(fd, events)
[docs] def unregister(self, fd):
"""
unregister for epoll
"""
self._epoll.unregister(fd)
[docs] def modify(self, fd, events):
"""modify kqueue events"""
self._epoll.modify(fd, events)
[docs] def poll(self, wait_time):
"""start to poll"""
return self._epoll.poll(wait_time)
[docs]class KQueuePoller(BasePoller):
"""
kqueue for macos
"""
def __init__(self):
"""
"""
self._kq = select.kqueue()
[docs] def fileno(self):
"""return fileno of kqueue object"""
return self._kq.fileno()
[docs] def register(self, fd, events):
"""
register events for a fd
"""
# NOTE: KQ_EV_CLEAR should be set as
# it means edge trigger like EPOLLET
self.kvent_control(fd, events, select.KQ_EV_ADD | select.KQ_EV_CLEAR)
[docs] def unregister(self, fd):
"""
unregister for kevents
"""
events = READ | WRITE
self.kvent_control(fd, events, select.KQ_EV_DELETE)
[docs] def modify(self, fd, events):
"""modify kqueue events"""
self.unregister(fd)
self.register(fd, events)
[docs] def kvent_control(self, fd, events, flags):
"""kevent control"""
kevents = []
if events & WRITE:
kevents.append(select.kevent(
fd, filter=select.KQ_FILTER_WRITE, flags=flags))
if events & READ or not kevents:
kevents.append(select.kevent(
fd, filter=select.KQ_FILTER_READ, flags=flags))
for kevent in kevents:
self._kq.control([kevent], 0)
[docs] def poll(self, wait_time):
"""kqueue poll"""
kevts = self._kq.control(None, 1000, wait_time)
events = {}
for kevent in kevts:
fd = kevent.ident
if kevent.filter == select.KQ_FILTER_READ:
events[fd] = events.get(fd, 0) | READ
if kevent.filter == select.KQ_FILTER_WRITE:
if kevent.flags & select.KQ_EV_EOF:
# if KQ_EV_EOF received, it means the peer has closed
# / refused the socket
events[fd] = ERROR
else:
events[fd] = events.get(fd, 0) | WRITE
if kevent.flags & select.KQ_EV_ERROR:
events[fd] = events.get(fd, 0) | ERROR
return events.items()
[docs]class SelectPoller(BasePoller):
""" downgraded to select.select()"""
def __init__(self):
"""not epoll and not kqueue, downgraded to select.select"""
self._read_fds = []
self._write_fds = []
self._error_fds = []
[docs] def register(self, fd, events):
if events & READ:
self.read_fds.add(fd)
if events & WRITE:
self.write_fds.add(fd)
if events & ERROR:
self.error_fds.add(fd)
# Closed connections are reported as errors by epoll and kqueue,
# but as zero-byte reads by select, so when errors are requested
# we need to listen for both read and error.
self.read_fds.add(fd)
[docs] def modify(self, fd, events):
self.unregister(fd)
self.register(fd, events)
[docs] def unregister(self, fd):
self.read_fds.discard(fd)
self.write_fds.discard(fd)
self.error_fds.discard(fd)
[docs] def poll(self, timeout):
readable, writeable, errors = select.select(
self.read_fds, self.write_fds, self.error_fds, timeout
)
events = {}
for fd in readable:
events[fd] = events.get(fd, 0) | READ
for fd in writeable:
events[fd] = events.get(fd, 0) | WRITE
for fd in errors:
events[fd] = events.get(fd, 0) | ERROR
return events.items()
[docs]class PollerFactory(object):
"""Poller Factory"""
def __init__(self):
"""
"""
self._polldict = []
self._poller = None
self._started = False
if hasattr(select, 'epoll'):
self._poller = Epoller()
elif hasattr(select, 'kqueue'):
self._poller = KQueuePoller()
else:
self._poller = SelectPoller()
[docs] def stop(self):
"""stop the poller factory"""
self._started = False
[docs] def register(self, fd, events):
"""
register
"""
self._poller.register(fd, events)
[docs] def modify(self, fd, events):
self._poller.modify(fd, events)
[docs] def unregister(self, fd):
"""unregister"""
self._poller.unregister(fd)
[docs] def poll(self, wait_time):
"""poll"""
self._started = True
return self._poller.poll(wait_time)
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent