#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: Guannan Ma (@mythmgn),
"""
:description:
Object related storage
"""
import os
import abc
import time
import shutil
import ftplib
import traceback
import logging
from cup import log
from cup import err
__all__ = [
'AFSObjectSystem', 'S3ObjectSystem', 'FTPObjectSystem',
'LocalObjectSystem'
]
class ObjectInterface(object):
"""
object interface, abstract class. Should not be used directly
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""
:param config:
dict like config, should contains at leat
{
'uri': 'xxxx',
'user': 'xxxx', # or stands for accesskey
'passwords': 'xxxx', # or stands for secretkey
'extra': some_object
}
"""
self._config = config
def _validate_config(self, config, keys):
"""validate config if there's any missing items"""
ret = True
for key in keys:
if not key in config:
ret = False
return ret
@abc.abstractmethod
def put(self, dest, localfile):
"""
:param dest:
system path
:param localfile:
localfile
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
@abc.abstractmethod
def delete(self, path):
"""
delete a file
:param path:
object path
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
@abc.abstractmethod
def get(self, path, localpath):
"""
get the object into localpath
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
@abc.abstractmethod
def head(self, path):
"""
get the object info
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
'objectinfo': {
'size': 1024, # at least have this one
'atime': 'xxxx.xx.xx', # optional
'mtime': 'xxxx.xx.xx', # optional
'ctime': 'xxxx.xx.xx', # optional
.......
}
}
"""
@abc.abstractmethod
def mkdir(self, path, recursive=True):
"""
mkdir dir of a path
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
@abc.abstractmethod
def rmdir(self, path, recursive=True):
"""rmdir of a path
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
@abc.abstractmethod
def rename(self, frompath, topath):
"""rename from path to path"""
[docs]class AFSObjectSystem(ObjectInterface):
"""
AFSObjectSystem implemented interface.
"""
def __init__(self, config):
"""
:param config:
be complied with cup.util.conf.Configure2Dict().get_dict().
Shoule be dict like object
"""
ObjectInterface.__init__(self, config)
[docs] def put(self, dest, localfile):
"""
:param dest:
system path
:param localfile:
localfile
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
[docs] def delete(self, path):
"""
delete a file
:param path:
object path
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
[docs] def get(self, path, localpath):
"""
get the object into localpath
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
[docs] def head(self, path):
"""
get the object info
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
'objectinfo': {
size: 1024,
.......
}
}
"""
[docs] def mkdir(self, path):
"""
mkdir dir of a path
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
'objectinfo': {
size: 1024,
.......
}
}
"""
[docs] def rmdir(self, path):
"""rmdir of a path"""
[docs] def rename(self, frompath, topath):
"""rename"""
raise err.NotImplementedYet('AFSObjectSystem.rename')
# pylint: disable=R0902
# need to have so many
[docs]class S3ObjectSystem(ObjectInterface):
"""
s3 object system
"""
def __init__(self, config):
"""
:param config:
be complied with cup.util.conf.Configure2Dict().get_dict().
Shoule be dict like object
:raise:
cup.err.ConfigError if there's any config item missing
"""
ObjectInterface.__init__(self, config)
required_keys = ['ak', 'sk', 'endpoint', 'bucket']
if not self._validate_config(self._config, required_keys):
raise err.ConfigError(str(required_keys))
self._ak = self._config['ak']
self._sk = self._config['sk']
self._endpoint = self._config['endpoint']
self._bucket = self._config['bucket']
import boto3
from botocore import exceptions
from botocore import client as coreclient
self._s3_config = coreclient.Config(
signature_version='s3v4',
s3={'addressing_style': 'path'}
)
logging.getLogger('boto3').setLevel(logging.INFO)
logging.getLogger('botocore').setLevel(logging.INFO)
logging.getLogger('s3transfer').setLevel(logging.INFO)
log.info('to connect to boto3')
self.__s3conn = boto3.client(
's3',
aws_access_key_id=self._ak,
aws_secret_access_key=self._sk,
endpoint_url=self._endpoint,
# region_name=conf_dict['region_name'],
config=self._s3_config
)
self._exception = exceptions.ClientError
[docs] def put(self, dest, localfile):
"""
:param dest:
system path
:param localfile:
localfile
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
ret = {
'returncode': -1,
'msg': 'failed to put object'
}
with open(localfile, 'r') as fhandle:
try:
self.__s3conn.put_object(
Key='{0}'.format(dest),
Bucket=self._bucket,
Body=fhandle
)
ret['returncode'] = 0
ret['msg'] = 'success'
except self._exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def delete(self, path):
"""
delete a file
:param path:
object path
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
ret = {
'returncode': 0,
'msg': 'success'
}
try:
self.__s3conn.delete_object(
Key='{0}'.format(path),
Bucket=self._bucket
)
except self._exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def get(self, path, localpath):
"""
get the object into localpath
:return:
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
}
"""
ret = {
'returncode': 0,
'msg': 'success'
}
try:
with open(localpath, 'w+') as fhandle:
resp = self.__s3conn.get_object(
Key='{0}'.format(path),
Bucket=self._bucket
)
fhandle.write(resp['Body'].read())
except Exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def head(self, path):
"""
get the object info
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
'objectinfo': {
size: 1024,
.......
}
}
"""
ret = {
'returncode': -1,
'msg': 'failed to get objectinfo'
}
try:
resp = self.__s3conn.head_object(
Key='{0}'.format(path),
Bucket=self._bucket
)
ret['objectinfo'] = resp
ret['returncode'] = 0
ret['msg'] = 'success'
except self._exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def mkdir(self, path):
"""
mkdir dir of a path
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any',
'objectinfo': {
size: 1024,
..
}
}
"""
raise err.NotImplementedYet('mkdir not supported for S3ObjectSystem')
[docs] def rmdir(self, path):
"""rmdir of a path"""
raise err.NotImplementedYet('rmdir not supported for S3ObjectSystem')
[docs] def create_bucket(self, bucket_name):
"""create bucket"""
ret = {
'returncode': -1,
'msg': 'failed to create bucket'
}
try:
resp = self.__s3conn.create_bucket(
Bucket=bucket_name
)
ret['returncode'] = 0
ret['msg'] = 'success'
except self._exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def head_bucket(self, bucket_name):
"""create bucket"""
ret = {
'returncode': -1,
'msg': 'failed to create bucket',
'bucket_info': None
}
try:
resp = self.__s3conn.head_bucket(
Bucket=bucket_name
)
ret['returncode'] = 0
ret['msg'] = 'success'
ret['bucket_info'] = resp
except self._exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def delete_bucket(self, bucket_name, forcely=False):
"""delete bucket
:param forcely:
if forcely is True, the bucket will be delete no matter it has
objects inside. Otherwise, you have to delete items inside,
then delete the bucket
"""
ret = {
'returncode': -1,
'msg': 'failed to create bucket'
}
try:
if forcely:
resp = self.head_bucket(bucket_name)
res = self.__s3conn.list_objects(Bucket=bucket_name)
if 'Contents' in res:
for obj in res['Contents']:
try:
self.__s3conn.delete_object(
Bucket=bucket_name,
Key=obj['Key']
)
except Exception as error:
ret['msg'] = 'faield to delete obj in bucket'
return ret
resp = self.__s3conn.delete_bucket(
Bucket=bucket_name
)
ret['returncode'] = 0
ret['msg'] = 'success'
except self._exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def rename(self, frompath, topath):
"""rename"""
raise err.NotImplementedYet('AFSObjectSystem.rename')
[docs]class FTPObjectSystem(ObjectInterface):
"""
ftp object system, Plz notice all methods of FTPObjectSystem is NOT
thread-safe! Be careful when you use it in a service of concurrency.
"""
def __init__(self, config):
"""
:param config:
{
"uri":"ftp://host:port",
"user":"username",
"password":"password",
"extra":None //timeout:30s
}
:raise:
cup.err.ConfigError if there's any config item missing
"""
ObjectInterface.__init__(self, config)
required_keys = ['uri', 'user', 'passwords']
if not self._validate_config(self._config, required_keys):
raise err.ConfigError(str(required_keys))
self._uri = self._config['uri']
self._user = self._config['user']
self._passwd = self._config['passwords']
self._extra = self._config['extra']
self._dufault_timeout = 30
if self._extra is not None and isinstance(self._config['extra'], int):
self._dufault_timeout = self._extra
log.info('to connect to ftp server')
self._ftp_con = ftplib.FTP()
self._host = self._uri.split(':')[1][2:]
self._port = ftplib.FTP_PORT
if len(self._uri.split(':')[2]) > 0:
self._port = int(self._uri.split(':')[2])
self._ftp_con.connect(self._host, self._port, self._dufault_timeout)
self._ftp_con.login(self._user, self._passwd)
self._last_optime = time.time()
self._timeout = 15 # idle time for ftp
def __del__(self):
"""release connect"""
try:
self._ftp_con.quit()
except:
pass
def _check_timeout(self):
"""check if we need to reconnect"""
if time.time() - self._last_optime > self._timeout:
try:
self._ftp_con.quit()
except:
pass
self._ftp_con.connect(
self._host, self._port, self._dufault_timeout
)
self._ftp_con.login(self._user, self._passwd)
self._last_optime = time.time()
def _get_relative_path(self, path, cwd):
"""get relative path for real actions"""
cwd = os.path.normpath(cwd)
path = os.path.normpath(path)
if path.find(cwd) >= 0 and path.startswith('/'):
path = path[len(cwd):]
path = path.lstrip('/')
return path
[docs] def put(self, destfile, localfile):
"""
:param destfile:
ftp path for the localfile
:param localfile:
localfile
"""
ret = {
'returncode': 0,
'msg': 'success'
}
log.info('to put localfile {0} to ftp {1}'.format(localfile, destfile))
self._check_timeout()
cwd = self._ftp_con.pwd()
destdir = None
destfile = os.path.normpath(destfile)
destfile = self._get_relative_path(destfile, cwd)
rindex = destfile.rfind('/')
if rindex < 0:
destdir = cwd
file_name = destfile
elif rindex >= (len(destfile) - 1):
raise ValueError('value error, destfile {0}'.format(
destfile))
else:
destdir = destfile[:rindex]
file_name = destfile.split('/')[-1]
log.info('put localfile {0} into ftp {1}'.format(localfile, destfile))
with open(localfile, 'rb') as fhandle:
try:
self._ftp_con.cwd(destdir)
ftp_cmd = 'STOR {0}'.format(file_name)
self._ftp_con.storbinary(ftp_cmd, fhandle)
except Exception as error:
ret['returncode'] = -1
ret['msg'] = 'failed to put, err:{0}'.format(error)
self._ftp_con.cwd(cwd)
return ret
[docs] def delete(self, path):
"""delete file"""
ret = {
'returncode': 0,
'msg': 'success'
}
log.info('to delete ftp file: {0}'.format(path))
self._check_timeout()
cwd = os.path.normpath(self._ftp_con.pwd())
path = self._get_relative_path(path, cwd)
try:
self._ftp_con.delete(path)
except Exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def get(self, path, localpath):
"""
get a file into localpath
"""
ret = {
'returncode': 0,
'msg': 'success'
}
log.info('to get ftp file {0} to {1}'.format(path, localpath))
self._check_timeout()
cwd = self._ftp_con.pwd()
path = self._get_relative_path(path, cwd)
if localpath.endswith('/'):
localpath += path.split('/')[-1]
log.info('to get ftp {0} to local {1}'.format(path, localpath))
try:
with open(localpath, 'w+') as fhandle:
ftp_cmd = 'RETR {0}'.format(path)
resp = self._ftp_con.retrbinary(ftp_cmd, fhandle.write)
except Exception as error:
ret['returncode'] = -1
ret['msg'] = 'failed to get {0} to {1}, err:{2}'.format(
path, localpath, error
)
log.error(ret['msg'])
return ret
[docs] def head(self, path):
"""
get the file info
:return:
::
{
'returncode': 0 for success, others for failure,
'msg': 'if any'
'fileinfo': [
"-rw-rw-r-- 1 work work 201 Nov 9 17:03 __init__.py"
]
}
"""
ret = {
'returncode': -1,
'msg': 'failed to get objectinfo'
}
self._check_timeout()
cwd = self._ftp_con.pwd()
path = self._get_relative_path(path, cwd)
res_info = []
f_flag = False
def _call_back(arg):
if f_flag and arg.split()[-1].strip() == file_name:
return res_info.append(arg)
if not f_flag:
res_info.append(arg)
try:
if self.is_file(path):
file_name = path[path.rfind('/') + 1:]
f_flag = True
pos = path.rfind('/')
p_path = path[0: pos]
self._ftp_con.cwd(p_path)
else:
self._ftp_con.cwd(path)
self._ftp_con.retrlines('LIST', _call_back)
ret['fileinfo'] = res_info
ret['returncode'] = 0
ret['msg'] = 'success'
self._ftp_con.cwd(cwd)
except Exception as error:
ret['returncode'] = -1
ret['msg'] = str(error)
return ret
[docs] def mkdir(self, path, recursive=True):
"""
mkdir
"""
ret = {
'returncode': 0,
'msg': 'success'
}
self._check_timeout()
cwd = self._ftp_con.pwd()
path = self._get_relative_path(path, cwd)
try:
if not recursive:
self._ftp_con.mkd(path)
else:
subdirs = path.split('/')
for subdir in subdirs:
try:
self._ftp_con.cwd(subdir)
except Exception as e:
self._ftp_con.mkd(subdir)
self._ftp_con.cwd(subdir)
except Exception as error:
ret['returncode'] = -1
ret['msg'] = 'failed to mkdir, err:{0}'.format(error)
self._ftp_con.cwd(cwd)
return ret
[docs] def rmdir(self, path, recursive=True):
"""
rmdir
"""
ret = {
'returncode': 0,
'msg': 'success'
}
self._check_timeout()
cwd = self._ftp_con.pwd()
path = self._get_relative_path(path, cwd)
try:
if not recursive:
self._ftp_con.rmd(path)
else:
cwd = self._ftp_con.pwd()
self._ftp_con.cwd(path)
allItems = self._ftp_con.nlst()
for item in allItems:
if self.is_file(item):
self._ftp_con.delete(item)
else:
self.rmdir(item)
self._ftp_con.cwd(cwd)
self._ftp_con.rmd(path)
except Exception as error:
ret['returncode'] = -1
ret['msg'] = 'failed to rmdir, err:{0}'.format(error)
self._ftp_con.cwd(cwd)
return ret
[docs] def is_file(self, path):
"""path is file or not"""
res = False
self._check_timeout()
cwd = self._ftp_con.pwd()
path = self._get_relative_path(path, cwd)
res_info = []
def _call_back(arg):
res_info.append(arg)
try:
self._ftp_con.cwd(path)
self._ftp_con.cwd(cwd)
return res
except Exception as e:
pass
try:
pos = path.rfind('/')
if pos == -1:
file_name = path
else:
p_path = path[0: pos]
file_name = path[pos + 1:]
self._ftp_con.cwd(p_path)
self._ftp_con.retrlines('MLSD', _call_back)
for item in res_info:
if item.split(';')[-1].strip() == file_name and 'type=file' in item:
self._ftp_con.cwd(cwd)
return True
self._ftp_con.cwd(cwd)
except Exception as error:
pass
return False
[docs] def rename(self, frompath, topath):
"""rename frompath to path"""
ret = {
'returncode': 0,
'msg': 'success'
}
try:
self._ftp_con.rename(frompath, topath)
except Exception as error:
ret['returncode'] = -1
ret['msg'] = 'failed to rename from {0} to {1}'.format(
frompath, topath
)
return ret
[docs]class LocalObjectSystem(ObjectInterface):
"""local object system"""
def __init__(self, kvconfig=None):
"""
initialize
"""
config = {
'uri': None,
'user': None, # or stands for accesskey
'passwords': None, # or stands for secretkey
'extra': None
}
ObjectInterface.__init__(self, config)
[docs] def put(self, dest, localfile):
"""
local object put == copy
"""
ret = {
'returncode': 0,
'msg': 'success'
}
try:
shutil.copy2(dest, localfile)
# pylint: disable=W0703
except Exception as error:
ret['returncode'] = -1
ret['msg'] = 'failed to put:{0}'.format(error)
return ret
[docs] def delete(self, path):
"""delete a file in local"""
ret = {
'returncode': 0,
'msg': 'success'
}
try:
os.unlink(path)
# pylint: disable=W0703
except Exception as error:
ret['returncode'] = -1
ret['msg'] = 'failed to unlink file:{0}, err:{1}'.format(
path, error
)
return ret
[docs] def get(self, path, localpath):
"""
get a file into localpath
"""
return self.put(path, localpath)
[docs] def head(self, path):
"""get the object info"""
retcode = 0
msg = 'ok'
objectinfo = None
if not os.path.exists(path):
retcode = 255
msg = 'file/dir not found'
else:
statinfo = os.stat(path)
objectinfo = {
'size': statinfo.st_size,
'atime': statinfo.st_atime,
'mtime': statinfo.st_mtime,
'ctime': statinfo.st_ctime
}
info_dict = {
'returncode': retcode,
'msg': msg,
'objectinfo': objectinfo
}
return info_dict
[docs] def mkdir(self, path, recursive=True):
"""
mkdir
"""
ret = {
'returncode': 0,
'msg': 'success'
}
func = os.makedirs
if not recursive:
func = os.mkdir
try:
func(path)
except IOError as error:
ret['returncode'] = -1
ret['msg'] = 'failed to mkdir, err:{0}'.format(error)
return ret
[docs] def rmdir(self, path, recursive=True):
"""
rmdir
"""
ret = {
'returncode': 0,
'msg': 'success'
}
func = os.rmdir
if recursive:
func = shutil.rmtree
try:
func(path)
except IOError as error:
ret['returncode'] = -1
ret['msg'] = 'failed to rmdir, err:{0}'.format(error)
return ret
[docs] def rename(self, frompath, topath):
"""rename from path to path"""
ret = {
'returncode': 0,
'msg': 'success'
}
try:
os.rename(frompath, topath)
except IOError as error:
ret['returncode'] = -1
ret['msg'] = 'failed to rename {0} to {1}'.format(
frompath, topath
)
return ret
# vi:set tw=0 ts=4 sw=4 nowrap fdm=indent