Source code for cup.exfile
#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
file related functions
"""
import os
import sys
import copy
import shutil
from cup import err
from cup import decorators
from cup import platforms
__all__ = [
'LockFile', 'FILELOCK_SHARED', 'FILELOCK_EXCLUSIVE',
'FILELOCK_NONBLOCKING', 'FILELOCK_UNLOCK', 'mk_newnode', 'safe_rmtree',
'safe_delete'
]
CANNOT_DEL_PATHLIST = [
'/',
'/proc',
'/boot',
'/sys'
]
if platforms.is_linux() or platforms.is_mac():
import fcntl
FILELOCK_EXCLUSIVE = fcntl.LOCK_EX
FILELOCK_SHARED = fcntl.LOCK_SH
FILELOCK_NONBLOCKING = fcntl.LOCK_NB
FILELOCK_UNLOCK = fcntl.LOCK_UN
elif platforms.is_windows():
import msvcrt
def file_size(fobj):
"""win file size"""
return os.path.getsize(os.path.realpath(fobj.name) )
def win_lockfile(fobj, blocking=True):
"""win lock file"""
flags = msvcrt.LK_RLCK
if not blocking:
flags = msvcrt.LK_NBRLCK
msvcrt.locking(fobj.fileno(), flags, file_size(fobj))
def win_unlockfile(fobj):
"""win unlock file"""
msvcrt.locking(fobj.fileno(), msvcrt.LK_UNLCK, file_size(fobj))
[docs]class LockFile(object):
"""
Lock file in order to prevent others from trying to lock it again
Code Example:
::
from cup import exfile
filelock = exfile.LockFile()
# xxxx do something
filelock.lock(blocking=True)
# xxxxx do something else
filelock.unlock()
"""
def __init__(self, fpath, locktype=FILELOCK_EXCLUSIVE):
"""
exclusive lockfile, by default.
Notice that the file CANNOT exist before you intialize a LockFile obj.
Otherwise, it will raise cup.err.LockFileError
Plz notice that on windows, cup only support EXCLUSIVE lock
:raise:
cup.err.LockFileError if we encounter errors
"""
self._fpath = fpath
self._locktype = locktype
self._fhandle = None
try:
self._fhandle = os.open(
self._fpath, os.O_CREAT | os.O_RDWR
)
except IOError as error:
raise err.LockFileError(error)
except OSError as error:
raise err.LockFileError(error)
except Exception as error:
raise err.LockFileError(
'catch unkown error type:{0}'.format(error)
)
def __del__(self):
"""del the instance"""
try:
if self._fhandle is not None:
os.close(self._fhandle)
# pylint: disable=W0703
except Exception as error:
sys.stderr.write('failed to close lockfile:{0}, msg:{1}'.format(
self._fpath, error)
)
sys.stderr.flush()
[docs] @decorators.needposix
def lock(self, blocking=True):
"""
lock the file
:param blocking:
If blocking is True, will block there until cup gets the lock.
True by default.
:return:
return False if locking fails
:raise Exception:
raise cup.err.LockFileError if blocking is False and
the lock action failed
"""
if platforms.is_linux() or platforms.is_mac():
flags = 0x1
if FILELOCK_SHARED == self._locktype:
flags = FILELOCK_SHARED
elif FILELOCK_EXCLUSIVE == self._locktype:
flags = FILELOCK_EXCLUSIVE
else:
raise err.LockFileError('does not support this lock type')
if not blocking:
flags |= FILELOCK_NONBLOCKING
ret = None
try:
ret = fcntl.flock(self._fhandle, flags)
except IOError as error:
raise err.LockFileError(error)
except Exception as error:
raise err.LockFileError(error)
return ret
elif platforms.is_windows():
win_lockfile(self._fhandle, blocking)
[docs] def unlock(self):
"""unlock the locked file"""
if platforms.is_linux() or platforms.is_mac():
try:
fcntl.flock(self._fhandle, FILELOCK_UNLOCK)
except Exception as error:
raise err.LockFileError(error)
elif platforms.is_windows():
win_unlockfile(self._fhandle)
[docs] def filepath(self):
"""
return filepath
"""
return self._fpath
[docs]def mk_newnode(abspath, check_exsistence=False):
"""
make new file node for abspath
:param abspath:
plz use absolute path. Not relative path
:param check_exsistence:
if True, will check if the abspath existence (
raise IOError if abspath exists)
:raise Exception:
IOError
"""
if check_exsistence:
if os.path.exists(abspath):
raise IOError('{0} already exists'.format(abspath))
with open(abspath, 'w+') as _:
pass
[docs]def safe_rmtree(abspath, not_del_list=None):
"""
the function will safely remove files/dirs of abspath with not_del_list
excluded
:param abspath:
pass in absolute path
:param not_del_list:
cannot del path list
:raise Exception:
ValueError, if abspath is in exfile.CANNOT_DEL_PATHLIST or not_del_list
IOError, if cup encounters any problem
"""
normpath = os.path.normpath(abspath)
tmplist = copy.deepcopy(CANNOT_DEL_PATHLIST)
if not_del_list is not None:
tmplist.extend(not_del_list)
if abspath in tmplist:
raise ValueError(
'cannot delete path in {0}'.format(tmplist)
)
shutil.rmtree(normpath)
[docs]def safe_delete(abspath, not_del_list):
"""
the function will safely delete file/object of abspath.
If the abspath in not_del_list, safe delete will raise ValueError
:param abspath:
pass in absolute path
:param not_del_list:
cannot del path list
:raise Exception:
ValueError, if abspath is in exfile.CANNOT_DEL_PATHLIST or not_del_list
IOError, if cup encounters any problem
"""
normpath = os.path.normpath(abspath)
if normpath in not_del_list:
raise ValueError('cannot delete files in {0}'.format(not_del_list))
if os.path.isdir(abspath):
safe_rmtree(abspath, not_del_list)
else:
os.unlink(abspath)
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent