Source code for cup.cache

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
    decorators related module
"""
import time
import uuid
try:
    import Queue as queue
except ImportError:
    import queue
import collections
import contextlib

import cup
from cup import log
from cup import err
from cup.util import thread


__all__ = ['CacheFull', 'KVCache', 'KvCache']


[docs]class CacheFull(err.BaseCupException): """ CacheFull for cache.KvCache """ def __init__(self, msg): err.BaseCupException.__init__(self, msg)
[docs]class KVCache(object): """ Key-Value Cache object. You can use function set/get to access KeyValue Cache. When a k-v is hit by function **get**, the expire_sec will be expanded to 2 * (expire_sec) """ _STAT = collections.namedtuple( 'kvcache_stat', 'key_num expired_num' ) INFINITE_TIME = 10000 * 365 * 24 * 60 * 60 # 10000 years, enough for cache TIME_EXTENSION = 5 * 60 # 5 mins def __init__(self, name=None, maxsize=0, time_extension=None): """ :param maxsize: 0 by default which means store as more cache k/v as the system can :param time_extension: When a cache item has been hit, the expire_time will be refreshed to the greater one, either (TIME_EXTENSION + time.time() or (TIME_EXTENSION + expire_sec) """ if name is not None: self._name = name else: self._name = 'cache.noname.{0}'.format(uuid.uuid4()) log.warn( 'You initialize the KVCache with no name. Strongly suggest' 'you pick up a meaningful name for it in order to debug' ) self._sorted_keys = queue.PriorityQueue(maxsize=maxsize) self._maxsize = maxsize self._kv_data = {} self._lock = thread.RWLock() if time_extension is None: self._time_extension = self.TIME_EXTENSION else: self._time_extension = time_extension
[docs] def set_time_extension(self, time_extension): """set time extension""" if time_extension <= 0: raise ValueError('time extension should > 0') log.info('KVCache set time extension to {0}'.format(time_extension)) self._time_extension = time_extension
@contextlib.contextmanager def _lock_release(self, b_rw_lock): if b_rw_lock is True: self._lock.acquire_writelock() else: self._lock.acquire_readlock() try: yield # pylint: disable=W0703 except Exception as error: cup.log.warn('something happend in cache:%s' % error) finally: if b_rw_lock is True: self._lock.release_writelock() else: self._lock.release_readlock()
[docs] def set(self, kvdict, expire_sec=None): """ set cache with kvdict :: { 'key1': 'value1', 'key2': 'value2', .... } :param kvdict: kvdict is a dict that contains your cache. :param expire_sec: if expire_sec is None, the cache will never expire. :return: True if set cache successfully. False otherwise. """ if all([ self._maxsize != 0, len(kvdict) > self._maxsize ]): log.error( 'KVCache {0} cannot insert more ' 'elements than the maxsize'.format(self._name) ) return False expire_value = None if expire_sec is not None and expire_sec != self.INFINITE_TIME: expire_value = expire_sec + time.time() else: expire_value = self.INFINITE_TIME with self._lock_release(b_rw_lock=True): for key in kvdict: if key in self._kv_data: cup.log.debug( 'KVCache: Key:{0} updated.'.format(key) ) self._kv_data[key] = (expire_value, kvdict[key]) continue if not self._heapq_newset(key, kvdict[key], expire_value): return False return True
def _heapq_newset(self, key, value, expire_value): """ headp set """ if any([ self._maxsize == 0, len(self._kv_data) < self._maxsize ]): # no limit, just insert it into the queue self._sorted_keys.put((expire_value, key)) self._kv_data[key] = (expire_value, value) return True else: # need replace the smallest one while True: try: pop_value = self._sorted_keys.get_nowait() except queue.Full: return False real_value = self._kv_data.get(pop_value[1], None) # key exipred, key deleted in self._kv_data if real_value is None: self._kv_data[key] = (expire_value, value) self._sorted_keys.put((expire_value, key)) return True if real_value[0] > pop_value[0]: # resort, adjust real self._sorted_keys.put((expire_value, key)) else: if expire_value < pop_value[0]: log.error( 'KVCache {0} the alorithm you design has faults ' 'the new inserted cache {1} expire time ' '< the oldest cache {2} in it'.format( self._name, (key, expire_value), pop_value ) ) return False del self._kv_data[pop_value[1]] self._kv_data[key] = (expire_value, value) self._sorted_keys.put((expire_value, key)) break return True def _get_refreshed_exipre_time(self, expire_sec): new_refresh = time.time() + self._time_extension new_expire = expire_sec + self._time_extension return new_expire if new_expire < new_refresh else new_refresh
[docs] def get(self, key): """ Get your cache with key. If the cache is expired, it will return None. If the key does not exist, it will return None. """ with self._lock_release(b_rw_lock=False): if key not in self._kv_data: return None expire_sec, value = self._kv_data[key] if time.time() > expire_sec: log.info('KVCache {0}: key {1} hit, but exipred {1}'.format( self._name, key )) del self._kv_data[key] return None log.debug('key:%s of kvCache fetched.' % key) expire_sec = self._get_refreshed_exipre_time(expire_sec) self._kv_data[key] = (expire_sec, value) return value
[docs] def pop_n_expired(self, num=0): """ :param num: if num is 0, will get all expired key/values :return: A dict. Return expired items. Return type is a dict :: { 'key' : (value, expire_time) } """ kvlist = {} nowtime = time.time() allexpire = True if num == 0 else False with self._lock_release(b_rw_lock=False): while True: try: pop_value = self._sorted_keys.get_nowait() except queue.Full: break real_value = self._kv_data.get(pop_value[1], None) # has already been deleted if real_value is None: continue if real_value[0] > pop_value[0]: # resort, adjust real self._sorted_keys.put((real_value[0], pop_value[1])) else: if real_value[0] > nowtime: break else: kvlist[pop_value[1]] = real_value del self._kv_data[pop_value[1]] if not allexpire: num -= 1 if not allexpire and num <= 0: break return kvlist
[docs] def size(self): """ :return: cached item size """ return len(self._kv_data)
[docs] def clear(self): """ remove all kv cache inside. """ with self._lock_release(b_rw_lock=True): del self._kv_data self._kv_data = {} del self._sorted_keys self._sorted_keys = queue.PriorityQueue(self._maxsize)
# for compatibility KvCache = KVCache # vi:set tw=0 ts=4 sw=4 nowrap fdm=indent