Source code for cup.net.route

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: YangGuang
"""
:description:
    host route info related module
"""
from __future__ import print_function
import copy
import json
import socket
import struct


__all__ = ['RouteInfo']


[docs]class RouteInfo(object): """ Handler of Route Info for Linux system, for ipv4 only. *E.g.* :: from cup.net import route ri = route.RouteInfo() print(json.dumps(ri.get_route_by_ip('10.32.19.92'), indent=1)) print(json.dumps(ri.get_routes(), indent=1)) *Return* :: { "Use": "0", "Iface": "eth1", "Metric": "0", "Destination": "10.0.0.0", "Mask": "255.0.0.0", "RefCnt": "0", "MTU": "0", "Window": "0", "Gateway": "10.226.71.1", "Flags": "0003", "IRTT": "0" } [ { "Use": "0", "Iface": "eth1", "Metric": "0", "Destination": "10.226.71.0", "Mask": "255.255.255.0", "RefCnt": "0", "MTU": "0", "Window": "0", "Gateway": "0.0.0.0", "Flags": "0001", "IRTT": "0" }, { "Use": "0", "Iface": "eth1", "Metric": "0", "Destination": "169.254.0.0", "Mask": "255.255.0.0", "RefCnt": "0", "MTU": "0", "Window": "0", "Gateway": "0.0.0.0", "Flags": "0001", "IRTT": "0" }, { "Use": "0", "Iface": "eth1", "Metric": "0", "Destination": "192.168.0.0", "Mask": "255.255.0.0", "RefCnt": "0", "MTU": "0", "Window": "0", "Gateway": "10.226.71.1", "Flags": "0003", "IRTT": "0" }, { "Use": "0", "Iface": "eth1", "Metric": "0", "Destination": "172.16.0.0", "Mask": "255.240.0.0", "RefCnt": "0", "MTU": "0", "Window": "0", "Gateway": "10.226.71.1", "Flags": "0003", "IRTT": "0" }, { "Use": "0", "Iface": "eth1", "Metric": "0", "Destination": "10.0.0.0", "Mask": "255.0.0.0", "RefCnt": "0", "MTU": "0", "Window": "0", "Gateway": "10.226.71.1", "Flags": "0003", "IRTT": "0" } ] """ ROUTE_FILE = '/proc/net/route' def __init__(self): self._raw = [] self._init_proc_info() @staticmethod def _ip2int(ip): """ change ip address to integer :param ip: ip address in type of string :return: decimal integer in type of string """ return struct.unpack("!I", socket.inet_aton(ip))[0] @staticmethod def _int2ip(dec): """ change integer to ip address :param dec: decimal integer in type fo string :return: ip address """ return socket.inet_ntoa(struct.pack("!I", dec)) @staticmethod def _ip_check(ipaddr): q = ipaddr.split('.') ret = filter( lambda x: x >= 0 and x <= 255, map(int, filter(lambda x: x.isdigit(), q)) ) return (len(q) == 4) and (len(ret) == 4) def _init_proc_info(self): """ read routeinfo from /proc/net/route, and parse it to dict this fun will be called when __init__ """ route_info = [] with open(self.ROUTE_FILE, 'r') as fd: for line in fd.readlines(): if line.startswith('Iface\t'): continue d_item = {} items = line.split('\t') if len(items) != 11: continue d_item['Iface'] = items[0] d_item['Destination'] = items[1] d_item['Gateway'] = items[2] d_item['Flags'] = items[3] d_item['RefCnt'] = items[4] d_item['Use'] = items[5] d_item['Metric'] = items[6] d_item['Mask'] = items[7] d_item['MTU'] = items[8] d_item['Window'] = items[9] d_item['IRTT'] = items[10].strip('\n').rstrip(' ') route_info.append(copy.deepcopy(d_item)) self._raw = copy.deepcopy(route_info) def _raw2view(self, r): """ change raw route_info to be readable :param r: raw route_info :return: readable route_info """ res = copy.deepcopy(r) res['Destination'] = self._int2ip( socket.ntohl(int(r['Destination'], 16)) ) res['Gateway'] = self._int2ip(socket.ntohl(int(r['Gateway'], 16))) res['Mask'] = self._int2ip(socket.ntohl(int(r['Mask'], 16))) return res
[docs] def get_routes(self): """ get all the route_info of this host :return: all the readable route_info of this host """ res_l = [] for r in self._raw: res_l.append(self._raw2view(r)) return res_l
[docs] def get_interface_by_ip(self, ip): """ get the interface which can reach to the ip :param ip: destination ip :return: interface name which can reach to the ip. None if failed. """ if self._ip_check(ip) is False: return None route_info = self.get_route_by_ip(ip) if route_info is not None: return route_info['Iface'] else: return None
[docs] def get_route_by_ip(self, ip): """ get the route_info which can reach to the ip address :param ip: destination ip address :return: route_info in type of dict """ if self._ip_check(ip) is False: return None i_ip = socket.ntohl(int(self._ip2int(ip))) raw_route = self._raw ret = None for r in raw_route: if int(r['Destination'], 16) == i_ip & int(r['Mask'], 16): if ret is None: ret = r continue old = int(ret['Destination'], 16) & int(ret['Mask'], 16) new = int(r['Destination'], 16) & int(r['Mask'], 16) if old < new: ret = r elif old == new: if int(ret['Metric']) < int(r['Metric']): ret = r return self._raw2view(ret)
[docs] def get_interfaces(self): """get all the interface of this host""" itfs = set() for r in self._raw: itfs.add(r['Iface']) return list(itfs)
def _test(): ri = RouteInfo() # print ri._ip2int('1.0.0.0') # print ri._raw_info # print # print json.dumps(ri.route, indent=1) print(json.dumps(ri.get_route_by_ip('10.32.19.92'), indent=1)) print(json.dumps(ri.get_routes(), indent=1)) # print(json.dumps(ri.get_routes(), indent=1)) # print(ri.get_interfaces()) # print('10.32.19.1:',ri._dot_decimal_to_hex('10.32.19.1')) # print('255.255.255.0:',ri._dot_decimal_to_hex('255.255.255.0')) # print('0113200A:',ri._hex_to_dot_decimal('0113200A')) # print(ri._get_net()) # print(json.dumps(ri.route,indent=1)) if __name__ == '__main__': _test()