#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
heartbeat related module
"""
from __future__ import print_function
import time
import pickle
import platform
import threading
import io
from cup import log
from cup import net
from cup.util import conf
from cup import platforms as plat
# Only check when it's run under py3
safe_builtins = {
'range',
'complex',
'set',
'frozenset',
'slice',
}
[docs]class RestrictedUnpickler(pickle.Unpickler):
[docs] def find_class(self, module, name):
"""
Only allow safe classes from builtins
Only check builtins when it's run under py 3
"""
if (3, 0) <= sys.version_info <= (4, 0):
import builtins
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
"""Forbid everything else"""
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))
else:
pass
[docs]def restricted_loads(s):
"""Helper function analogous to pickle.loads()"""
return RestrictedUnpickler(io.BytesIO(s)).load()
if plat.is_linux():
from cup.res import linux
elif plat.is_mac():
from cup.res import mac
try:
# pylint: disable=W0611
import Queue as queue
except ImportError:
# pylint: disable=F0401
import queue
[docs]class Device(object):
"""
Base class for all devices in heartbeat service
"""
def __init__(self, name):
"""
:param:
the name of the device
"""
self._name = name
self._last_healthy = time.time()
self._dict_info = None
[docs] def get_last_healthy(self):
"""
get last_healthy time of the device
"""
return self._last_healthy
[docs] def set_last_healthy(self):
"""
set last_healthy time
"""
log.debug('device %s set_last_healthy' % self._name)
self._last_healthy = time.time()
[docs] def serilize(self):
"""
serilize device info
"""
return pickle.dumps(self._dict_info)
[docs] def deserilize(self, binary):
"""
deserilize it from binary
"""
try:
self._dict_info = pickle.loads(pickle.loads(restricted_loads(binary)))
return True
# pylint: disable=W0703
except Exception as error:
log.warn('deserilize linux device error, msg:%s' % error)
return False
[docs] def get_dict_resinfo(self):
"""
get dict of resource info
"""
return self._dict_info
[docs] def get_name(self):
"""get name"""
return self._name
[docs]class LinuxHost(Device):
"""
a linux host resource
"""
def __init__(self, name, init_this_host=False, iface='eth0', port=0):
"""
:param name:
name of the LinuxHost
:param init_this_host:
if init_this_host is True, will initialize the object by this linux
. Otherwise, you need to initialize it by yourself.
:exception socket.gaierror :
if we cannot get the ip of the host, the object construction
may raise socket.gaierror exception.
You have to code {try: catch socket.gaierror as err:}
"""
Device.__init__(self, name)
# -1 means initialized
self._dict_info = {
'iface': iface,
'ipaddr': '0.0.0.0',
'port': 0,
'hostname': net.get_local_hostname(),
'cpu_idle': -1,
'mem_inuse': -1, # MB
'mem_total': -1,
'net_in': -1, # kb
'net_out': -1 # kb
}
if init_this_host:
self._dict_info['ipaddr'] = net.get_hostip()
self._dict_info['port'] = port
cpuinfo = linux.get_cpu_usage(1)
meminfo = linux.get_meminfo()
self._dict_info['net_in'] = linux.get_net_recv_speed(
self._dict_info['iface'], 1
)
self._dict_info['net_out'] = linux.get_net_transmit_speed(
self._dict_info['iface'], 1
)
# pylint: disable=E1101
self._dict_info['cpu_idle'] = cpuinfo.idle
# pylint: disable=E1101
self._dict_info['mem_inuse'] = meminfo.total - meminfo.free
self._dict_info['mem_total'] = meminfo.total
[docs] def set_linux_res_bydict(self, info_dict):
"""
:param info_dict:
::
{
'iface': 'eth0',
'ipaddr': '10.10.10.1',
'port': 8089,
'cpu_idle': 50,
'mem_inuse': 1024, # MB
'mem_total': 8192,
'net_in': 8192, # kb
'net_out': 102400, # kb
}
"""
for key in info_dict:
if key not in self._dict_info:
log.warn('does not have this key:%s, ignore' % key)
continue
self._dict_info[key] = info_dict[key]
log.debug('linux info:%s updated, %s' % (key, info_dict[key]))
[docs] def set_ip_port(self, ipaddr):
"""
set ip information
:param ipaddr:
ipaddr should be string and something like 10.10.10.1
"""
self._dict_info['ipaddr'] = ipaddr
[docs] def get_ip(self):
"""
return ip information
"""
return self._dict_info['ipaddr']
[docs] def get_ip_port(self):
"""
return ip:port
"""
return self._dict_info['ipaddr'] + ':' + str(self._dict_info['port'])
[docs] def set_cpu_idle(self, idle_rate):
"""
set cpu idle rate
"""
self._dict_info['cpu_idle'] = idle_rate
[docs] def get_cpu_idle(self):
"""
get cpu idle rate
"""
return self._dict_info['cpu_idle']
[docs] def set_mem_usage(self, mem_inuse, mem_total):
"""
set up mem_inuse and mem_total.
Will update any of them if it is not None.
"""
if mem_inuse is not None:
self._dict_info['mem_inuse'] = mem_inuse
if mem_total is not None:
self._dict_info['mem_total'] = mem_total
[docs] def get_mem_info(self):
"""
:return:
(mem_inuse, mem_total), in MB
"""
return (self._dict_info['mem_inuse'], self._dict_info['mem_total'])
[docs] def set_net_usage(self, net_in, net_out):
"""
:param net_in:
net_in in kB/s. If net_in is None, will update nothing.
:param net_out:
net_out in kB/s. If net_out is None, will update nothing.
"""
if net_in is not None:
self._dict_info['net_in'] = net_in
if net_out is not None:
self._dict_info['net_out'] = net_out
[docs] def get_net_usage(self):
"""
:return:
(net_in, net_out)
"""
return (self._dict_info['net_in'], self._dict_info['net_out'])
[docs]class MacHost(Device):
"""
a mac host
"""
def __init__(self, name, init_this_host=False, iface='en01', port=0):
"""
:param name:
name of the LinuxHost
:param init_this_host:
if init_this_host is True, will initialize the object by this linux
. Otherwise, you need to initialize it by yourself.
:exception socket.gaierror :
if we cannot get the ip of the host, the object construction
may raise socket.gaierror exception.
You have to code {try: catch socket.gaierror as err:}
"""
Device.__init__(self, name)
self._dict_info = {
'iface': iface,
'ipaddr': '0.0.0.0',
'port': 0,
'hostname': net.get_local_hostname(),
'cpu_idle': -1,
'mem_inuse': -1, # MB
'mem_total': -1,
'net_in': -1, # kb
'net_out': -1 # kb
}
if init_this_host:
self._dict_info['ipaddr'] = net.get_hostip()
self._dict_info['port'] = port
cpuinfo = mac.get_cpu_usage(1)
meminfo = mac.get_meminfo()
self._dict_info['net_in'] = mac.get_net_recv_speed(
self._dict_info['iface'], 1
)
self._dict_info['net_out'] = mac.get_net_transmit_speed(
self._dict_info['iface'], 1
)
# pylint: disable=E1101
self._dict_info['cpu_idle'] = cpuinfo.idle
# pylint: disable=E1101
self._dict_info['mem_inuse'] = meminfo.total - meminfo.free
self._dict_info['mem_total'] = meminfo.total
# class Process(LinuxHost):
# def __init__(self, procname, path):
[docs]class HeartbeatService(object):
"""
HeartBeat service
"""
def __init__(self, judge_lost_in_sec, keep_lost=False):
"""
:param judge_lost_in_sec:
if you call function get_lost() and find that time.time()
minus last_healthy time of the device > judge_lost_in_sec,
the device will be marked as lost.
:param keep_lost:
whether we store lost deivce info
"""
self._lock = threading.Lock()
self._judge_lost = judge_lost_in_sec
self._devices = {}
if keep_lost:
self._lost_devices = {}
else:
self._lost_devices = None
[docs] def is_device_registered(self, key, including_dead=False):
"""
tell if the device is registered
"""
ret = False
self._lock.acquire()
if key in self._devices:
ret = True
if not ret and including_dead and self._lost_devices is not None \
and key in self._lost_devices:
ret = True
self._lock.release()
return ret
[docs] def adjust_judge_lost_time(self, time_in_sec):
"""
adjust judge_lost_in_sec
"""
self._lock.acquire()
log.info(
'heartbeat service judge_lost_in_sec changed, old %d, new %d' % (
self._judge_lost, time_in_sec
)
)
self._lock.release()
self._judge_lost = time_in_sec
return
[docs] def refresh(self, key, device_obj=None):
"""
:param key:
refresh the device by key
:return:
if key does not exist, return False
else, fresh the last_healthy time of the device
"""
assert type(key) == str, 'needs to be a str'
self._lock.acquire()
got_device = self._devices.get(key)
if got_device is None:
log.info(
'New device found:%s. To add it into heartbeat service'
% key
)
new_device = Device(key)
new_device.set_last_healthy()
self._devices[key] = new_device
else:
if device_obj is None:
got_device.set_last_healthy()
log.info(
'Heartbeat: Device %s only refreshed with heartbeat. '
'Resource not refreshed' % key
)
else:
log.info(
'Heartbeat: Device %s refreshed with resource. '
% key
)
self._devices[key] = device_obj
device_obj.set_last_healthy()
self._lock.release()
[docs] def get_lost(self):
"""
get lost devices
"""
now = time.time()
lost_devices = []
self._lock.acquire()
for dkey in self._devices.keys():
device = self._devices[dkey]
if now - device.get_last_healthy() > self._judge_lost:
if self._lost_devices is not None:
self._lost_devices[dkey] = device
del self._devices[dkey]
lost_devices.append(device)
log.warn('heartbeat lost, device:%s' % dkey)
self._lock.release()
return lost_devices
[docs] def cleanup_oldlost(self, dump_file=None):
"""
cleanup old lost devices.
:param dump_file:
if dump_file is not None, we will store devices info into dump_file
Otherwise, we will cleanup the lost devices only.
"""
self._lock.acquire()
log.info('start - empty_lost devices, dump_file:%s' % dump_file)
if self._lost_devices is None:
log.info('end - does not keep_lost devices, return')
self._lock.release()
return
if dump_file is None:
self._lost_devices = {}
log.info('end - does not have dump_file, return')
self._lock.release()
return
info_dict = {}
info_dict['devices'] = {}
if len(self._lost_devices) != 0:
info_dict['devices']['lost'] = []
info_dict['devices']['lost_num'] = len(self._lost_devices)
else:
info_dict['devices']['lost_num'] = 0
for dkey in self._lost_devices.keys():
try:
tmp_dict = {}
tmp_dict['key'] = dkey
tmp_dict['last_healthy'] = self._devices[dkey].get_last_healthy(
)
del self._lost_devices[dkey]
log.info('end - empty_lost devices')
info_dict['devices']['lost'].append(tmp_dict)
except KeyError as error:
log.warn('failed to dump lost_file, error:%s' % str(error))
conf_writer = conf.Dict2Configure(info_dict)
conf_writer.write_conf(dump_file)
self._lock.release()
return
def _test():
localhost = LinuxHost(name='localhost', init_this_host=True)
binary = localhost.serilize()
print('binary:{0}'.format(binary))
restricted_loads(binary)
print(pickle.loads(binary))
if __name__ == '__main__':
_test()
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent