Source code for cup.services.buffers
#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
buffer pool
"""
import threading
from cup import platforms
SMALL_BLOCK_SIZE = 4096 # 4kb
MEDIUM_BLOCK_SIZE = (128 + 4) * 1024 # (128 + 4) K
LARGE_BLOCK_SIZE = 1 * 1024 * 1024 + SMALL_BLOCK_SIZE # 1M + 4K
SMALL_BLOCK = 0
MEDIUM_BLOCK = 1
LARGE_BLOCK = 2
[docs]class Buffer(object):
"""
Buffer object which you get from BufferPool.allocate(num).
A **Buffer** consists of several bytearrays which is mutable compared to
a normal **str**. In other words, if you have senarios like: allocte mem
and deallocte mem frenquently. With high chance you can replace directly
using str by Buffer. It will reduce the memory fragments.
"""
def __init__(self, items, block_size, uniqid):
self._items = items
self._block_size = block_size
self._num = len(items)
self._length = 0
self._uniqid = uniqid
self._maxsize = block_size * self._num
[docs] def set(self, content, encoding='utf8'):
"""
set content to buffers
:return:
return (True, None) if succeed.
return (False, error_msg) otherwise
"""
if platforms.is_py3():
content = content.encode(encoding)
length = len(content)
ind = 0
item_ind = 0
if length > self._block_size * self._num:
return (False, 'content size > Buffer size')
while ind < length:
if (ind + self._block_size) <= (length - 1):
loop_size = self._block_size
else:
loop_size = length - ind
self._items[item_ind].extend(content[ind: loop_size])
item_ind += 1
ind += loop_size
self._length = length
return (True, None)
[docs] def get(self):
"""
return (True, (blocks, block_size, total_length, encoding)) if succeed
Otherwise, return (False, err_msg, None)
"""
rev = None
if self._length != 0:
rev = (True, (self._items, self._block_size, self._length))
else:
rev = (False, ('not initialized yet', self._block_size, None))
return rev
[docs] def get_uniq_id(self):
"""
return the uniqid for this object
"""
return self._uniqid
[docs] def get_byte_arrays(self):
"""
get byte arrays in the buffer
"""
return self._items
[docs] def maxsize(self):
"""return how many unicode/str you can set to the buffer"""
return self._maxsize
[docs] def length(self):
"""return the length you have used for the buffer"""
return self._length
# pylint: disable=R0902
[docs]class BufferPool(object):
"""
Buffer pool class which will ease memory fragment.
"""
def __init__(
self, pool_size, block_size=MEDIUM_BLOCK_SIZE, extendable=False
):
if block_size not in (
SMALL_BLOCK_SIZE, MEDIUM_BLOCK_SIZE, LARGE_BLOCK_SIZE
):
raise ValueError(
'block_size should be buffers.SMALL_BLOCK_SIZE'
' or buffers.MEDIUM_BLOCK_SIZE or buffers.LARGE_BLOCK_SIZE'
)
# TODO If extendable, we should expand the pool
self._extendable = extendable
self._lock = threading.Lock()
self._free_list = []
self._used_dict = {}
self._pool_size = pool_size
self._used_num = 0
self._free_num = self._pool_size - self._used_num
self._block_size = block_size
self._uniqid = 0
self._block_size = block_size
for _ in range(0, self._pool_size):
self._free_list.append(bytearray(self._block_size))
[docs] def allocate(self, num):
"""
acclocate buff with num * block_size
:return:
(True, Buffer object)
(False, str_error_msg)
"""
ret = None
self._lock.acquire()
if num > self._free_num:
ret = (False, 'not enough free buffer available')
else:
uniqid = threading.current_thread().ident
ind = self._free_num - num
buff = Buffer(
self._free_list[ind: self._free_num],
self._block_size,
uniqid
)
ret = (True, buff)
self._used_dict[uniqid] = ret[1]
del self._free_list[ind: self._free_num]
self._free_num -= num
self._used_num += num
self._lock.release()
return ret
[docs] def deallocate(self, buff):
"""
return the acclocated buff back to the pool
"""
if buff is None:
return False
self._lock.acquire()
uniqid = buff.get_uniq_id()
if uniqid not in self._used_dict:
raise ValueError('this buff is not in the pool!!!')
byte_arrays = buff.get_byte_arrays()
length = len(byte_arrays)
self._free_list.extend(byte_arrays)
del self._used_dict[uniqid]
del buff
self._used_num -= length
self._free_num += length
self._lock.release()
return True
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent