Repository URL to install this package:
Version:
5.0.6.dev10 ▾
|
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2014 TrilioData, Inc.
# All Rights Reserved.
import os
import sys
import errno
import argparse
from munch import munchify
from cachetools import LRUCache
import json
from . import vaultswift
import shutil
import functools
import subprocess
from tempfile import mkstemp
from contego import utils
from swiftclient.service import (
get_conn
)
from swiftclient.utils import (
config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
)
from fuse import FUSE, FuseOSError, Operations
from pwd import getpwnam
try:
from oslo_config import cfg
except ImportError:
from oslo_config import cfg
try:
from oslo_log import log as logging
except ImportError:
from nova.openstack.common import log as logging
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
FUSE_USER = "nova"
common_cli_opts = [
cfg.BoolOpt('debug',
short='d',
default=False,
help='Print debugging output (set logging level to '
'DEBUG instead of default WARNING level).'),
cfg.BoolOpt('verbose',
short='v',
default=False,
help='Print more verbose output (set logging level to '
'INFO instead of default WARNING level).'),
]
logging_cli_opts = [
cfg.StrOpt('log-config',
metavar='PATH',
help='If this option is specified, the logging configuration '
'file specified is used and overrides any other logging '
'options specified. Please see the Python logging module '
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-config-append',
metavar='PATH',
help='(Optional) Log Append'),
cfg.StrOpt('watch-log-file',
metavar='PATH',
help='(Optional) Watch log'),
cfg.StrOpt('log-format',
default=None,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'This option is deprecated. Please use '
'logging_context_format_string and '
'logging_default_format_string instead.'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
help='Format string for %%(asctime)s in log records. '
'Default: %(default)s'),
cfg.StrOpt('log-file',
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If no default is set, logging will go to stdout.'),
cfg.StrOpt('log-dir',
deprecated_name='logdir',
help='(Optional) The base directory used for relative '
'--log-file paths'),
cfg.BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
cfg.StrOpt('syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines')
]
generic_log_opts = [
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.IntOpt('rate_limit_burst',
default=1,
help='Burst limit')
]
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [%(request_id)s %(user)s %(tenant)s] '
'%(instance)s%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
'amqplib=WARN',
'sqlalchemy=WARN',
'boto=WARN',
'suds=INFO',
'keystone=INFO',
'eventlet.wsgi.server=WARN'
],
help='list of logger=LEVEL pairs'),
cfg.BoolOpt('publish_errors',
default=False,
help='publish error events'),
cfg.BoolOpt('fatal_deprecations',
default=False,
help='make deprecations fatal'),
cfg.StrOpt('instance_format',
default='[instance: %(uuid)s] ',
help='If an instance is passed with the log message, format '
'it like this'),
cfg.StrOpt('instance_uuid_format',
default='[instance: %(uuid)s] ',
help='If an instance UUID is passed with the log message, '
'format it like this'),
]
contego_vault_opts = [
cfg.StrOpt('vault_storage_type',
default='nfs',
help='Storage type: nfs, swift-i, swift-s'),
cfg.StrOpt('vault_data_directory',
help='Location where snapshots will be stored'),
cfg.StrOpt('vault_data_directory_old',
default='/var/triliovault',
help='Location where snapshots will be stored'),
cfg.StrOpt('tmpfs_mount_path',
default='tmpfs',
help='Location with respect to CONF.vault_data_directory_old'
'where tmpfs is mounted'),
cfg.StrOpt('vault_storage_nfs_export',
default='local',
help='NFS Export'),
cfg.StrOpt('vault_storage_nfs_options',
default='nolock',
help='NFS Options'),
cfg.StrOpt('vault_swift_auth_version',
default='KEYSTONE_V2',
help='KEYSTONE_V2 KEYSTONE_V3 TEMPAUTH'),
cfg.StrOpt('vault_swift_auth_url',
default='http://localhost:5000/v2.0',
help='Keystone Authorization URL'),
cfg.StrOpt('vault_swift_tenant',
default='admin',
help='Swift tenant'),
cfg.StrOpt('vault_swift_username',
default='admin',
help='Swift username'),
cfg.StrOpt('vault_swift_password',
default='password',
help='Swift password'),
cfg.StrOpt('region_name_for_services',
default='RegionOne',
help='Swift Region Name'),
cfg.StrOpt('vault_swift_domain_id',
default='default',
help='Swift domain id'),
cfg.StrOpt('vault_swift_domain_name',
default='Default',
help='Swift domain name'),
cfg.StrOpt('vault_swift_container_prefix',
default='TrilioVault',
help='Swift Container Prefix'),
cfg.StrOpt('vault_swift_segment_size',
default='33554432',
help='Default segment size 34MB'),
cfg.IntOpt('vault_retry_count',
default=2,
help='The number of times we retry on failures'),
cfg.StrOpt('vault_swift_url_template',
default='http://localhost:8080/v1/AUTH_%(project_id)s',
help='The URL of the Swift endpoint'),
cfg.IntOpt('vault_segment_size',
default=32 * 1024 * 1024,
help='vault swift object segmentation size'),
cfg.IntOpt('vault_cache_size',
default=5,
help='Number of segments of an object that need to be cached'),
cfg.StrOpt('rootwrap_conf',
default='/etc/nova/rootwrap.conf',
metavar='PATH',
help='rootwrap config file'),
cfg.StrOpt('keystone_auth_version',
default='2.0',
help='keystone auth version'),
]
CONF = cfg.CONF
CONF.register_opts(contego_vault_opts)
CONF.register_cli_opts(logging_cli_opts)
CONF.register_cli_opts(generic_log_opts)
CONF.register_cli_opts(log_opts)
CONF.register_cli_opts(common_cli_opts)
CONF(sys.argv[1:], project='swift')
LOG = logging.getLogger(__name__)
logging.setup(cfg.CONF, 'swift')
try:
LOG.logger.setLevel(logging.ERROR)
except BaseException:
LOG.logger.setLevel(logging.logging.ERROR)
options = {'sync_to': None, 'verbose': 1, 'header': [], 'auth_version': '1.0',
'os_options': {'project_name': None,
'region_name': None,
'user_domain_name': None,
'endpoint_type': None,
'object_storage_url': None,
'project_domain_id': None,
'user_id': None,
'user_domain_id': None,
'tenant_id': None,
'service_type': None,
'project_id': None,
'auth_token': None,
'project_domain_name': None
},
'ssl_compression': True,
'os_storage_url': None,
'os_username': '',
'os_password': '',
'os_cacert': os.environ.get('OS_CACERT'),
'os_cert': os.environ.get('OS_CERT'),
'os_key': os.environ.get('OS_KEY'),
#'insecure': config_true_value(os.environ.get('SWIFTCLIENT_INSECURE')),
'os_tenant_name': '',
'os_auth_url': '',
'os_auth_token': None,
'insecure': True,
'snet': False, 'sync_key': None, 'auth': '', 'user': '', 'key': '',
'read_acl': None, 'info': False, 'retries': 5, 'write_acl': None, 'meta': [],
'debug': False, 'use_slo': False, 'checksum': True, 'changed': False,
'leave_segments': False, 'skip_identical': True, 'segment_threads': 10,
'object_dd_threads': 10, 'object_uu_threads': 10, 'container_threads': 10,
'yes_all': False, 'object_name': None,
}
if CONF.vault_swift_auth_version == 'TEMPAUTH':
options['auth_version'] = '1.0'
options['auth'] = CONF.vault_swift_auth_url
options['user'] = CONF.vault_swift_username
options['key'] = CONF.vault_swift_password
else:
options['auth_version'] = CONF.keystone_auth_version
if options['auth_version'] == '3':
if CONF.vault_swift_domain_id != "":
options['os_options']['user_domain_id'] = CONF.vault_swift_domain_id
options['os_options']['domain_id'] = CONF.vault_swift_domain_id
elif CONF.vault_swift_domain_name != "":
options['os_options']['user_domain_name'] = CONF.vault_swift_domain_name
options['os_options']['domain_name'] = CONF.vault_swift_domain_name
options['os_options']['project_name'] = CONF.vault_swift_tenant
options['os_auth_url'] = CONF.vault_swift_auth_url
options['os_username'] = CONF.vault_swift_username
options['os_password'] = CONF.vault_swift_password
options['os_domain_id'] = CONF.vault_swift_domain_id
options['os_user_domain_id'] = CONF.vault_swift_domain_id
options['os_tenant_name'] = CONF.vault_swift_tenant
options['os_project_name'] = CONF.vault_swift_tenant
options['os_region_name'] = CONF.region_name_for_services
# needed to create Connection object
options['authurl'] = CONF.vault_swift_auth_url
options['auth'] = CONF.vault_swift_auth_url
options['user'] = CONF.vault_swift_username
options['key'] = CONF.vault_swift_password
CACHE_LOW_WATERMARK = 10
CACHE_HIGH_WATERMARK = 20
SEGMENT_FORMAT = "%016x.%08x"
lrucache = {}
def disable_logging(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
logging.logging.disable(logging.ERROR)
except BaseException:
logging.logging.disable(logging.logging.ERROR)
result = func(*args, **kwargs)
try:
logging.logging.disable(logging.NOTSET)
except BaseException:
logging.logging.disable(logging.logging.NOTSET)
return result
return wrapper
def split_head_tail(path):
head, tail = os.path.split(path)
prefix = ''
while head not in ('', '/'):
if prefix != '':
prefix = os.path.join(tail, prefix)
else:
prefix = tail
head, tail = os.path.split(head)
return tail, prefix
def get_head(path):
head, tail = os.path.split(path)
return head
class tmpfsfile():
def __init__(self, remove=False):
self.remove = remove
pass
def __enter__(self):
tmpfs_mountpath = os.path.join(CONF.vault_data_directory_old,
CONF.tmpfs_mount_path)
fh, self.open_file = mkstemp(dir=tmpfs_mountpath)
os.close(fh)
if self.remove:
os.remove(self.open_file)
return self.open_file
def __exit__(self, *args):
os.remove(self.open_file)
class ObjectRepository(object):
def __init__(self, root, **kwargs):
self.root = root
pass
def _full_path(self, partial):
if partial.startswith("/"):
partial = partial[1:]
path = os.path.join(self.root, partial)
return path
def object_open(self, object_name, flags):
pass
def object_upload(self, object_name, off, buf):
pass
def object_download(self, object_name, offset):
pass
def object_delete(self, object_name):
pass
def object_close(self, object_name, fh):
pass
def object_truncate(self, object_name, length, fh=None):
pass
def object_getattr(self, object_name, fh=None):
pass
def object_readdir(self, path, fh):
pass
def object_access(self, path, mode):
pass
def object_unlink(self, path):
pass
def object_statfs(self, path):
pass
class SwiftRepository(ObjectRepository):
def __init__(self, root, **kwargs):
super(SwiftRepository, self).__init__(root, **kwargs)
self.user_id = getpwnam(FUSE_USER).pw_uid
self.group_id = getpwnam(FUSE_USER).pw_gid
self.manifest = {}
def split_head_tail(self, path):
head, tail = os.path.split(path)
prefix = ''
while head not in ('', '/'):
if prefix != '':
prefix = os.path.join(tail, prefix)
else:
prefix = tail
head, tail = os.path.split(head)
return tail, prefix
def _get_head(self, path):
head, tail = os.path.split(path)
return head
def _get_cache(self, partial):
if partial.startswith("/"):
partial = partial[1:]
if not os.path.isdir(self.root):
try:
command = ['sudo', 'mkdir', self.root]
subprocess.call(command, shell=False)
command = ['sudo', 'chown',
str(self.user_id) + ':' + str(self.group_id),
self.root]
subprocess.call(command, shell=False)
except BaseException:
pass
else:
stat_info = os.stat(self.root)
if stat_info.st_uid != self.user_id or \
stat_info.st_gid != self.group_id:
command = ['sudo', 'chown',
str(self.user_id) + ':' + str(self.group_id),
CONF.vault_data_directory_old]
subprocess.call(command, shell=False)
#mount /var/triliovault/tmpfs
try:
tmpfs_mountpath = os.path.join(CONF.vault_data_directory_old,
CONF.tmpfs_mount_path)
if not os.path.isdir(tmpfs_mountpath):
utils.ensure_tree(tmpfs_mountpath)
if not os.path.ismount(tmpfs_mountpath):
command = ['timeout', '-sKILL', '30', 'sudo', 'mount',
'-t', 'tmpfs', '-o', 'size=200M,mode=0777',
"tmpfs", tmpfs_mountpath]
subprocess.check_call(command, shell=False)
except BaseException:
pass
path = os.path.join(self.root, partial)
return path
def _read_object_manifest(self, object_name):
container, prefix = self.split_head_tail(object_name)
put_headers = {}
mr = {}
_opts = options.copy()
_opts['object_name'] = prefix
_opts = munchify(_opts)
conn = get_conn(_opts)
conn.get_auth()
rsp = conn.get_object(
container, prefix,
headers=put_headers,
query_string='multipart-manifest=get',
response_dict=mr
)
manifest = rsp[1]
return manifest
def _write_object_manifest(self, object_name, object_manifest,
metadata={}):
container, prefix = self.split_head_tail(object_name)
put_headers = {}
mr = {}
put_headers['x-static-large-object'] = 'true'
_opts = options.copy()
_opts['object_name'] = prefix
for key, value in metadata.items():
put_headers['X-Object-Meta-' + key] = value
_opts = munchify(_opts)
conn = get_conn(_opts)
conn.put_object(
container, prefix, object_manifest,
headers=put_headers,
query_string='multipart-manifest=put',
response_dict=mr
)
return
def _get_object_metadata(self, object_name):
container, prefix = self.split_head_tail(object_name)
_opts = options.copy()
if container == '':
args = []
else:
args = [container]
_opts['delimiter'] = None
_opts['human'] = False
_opts['totals'] = False
_opts['long'] = False
_opts['prefix'] = None
_opts = munchify(_opts)
d = {}
if prefix != '':
_opts['prefix'] = prefix
args.append(prefix)
else:
prefix = None
st = vaultswift.st_stat(args, _opts)
metadata = {}
for key, value in st['headers'].items():
if 'x-object-meta' in key:
metadata[key.split('x-object-meta-')[1]] = value
return metadata
def object_open(self, object_name, flags):
container, prefix = self.split_head_tail(object_name)
full_path = self._full_path(object_name)
self.manifest[object_name] = {}
self.manifest[object_name]['readonly'] = flags == os.O_RDONLY or flags in (
int('8000', 16), int('8800', 16))
if flags == os.O_RDONLY or flags in (int('8000', 16), int('8800', 16)) or \
flags == int('8401', 16) or \
flags == os.O_RDWR or flags in (int('8002', 16), int('8802', 16)):
# load manifst
manifest = self._read_object_manifest(object_name)
manifest = json.loads(manifest)
for seg in manifest:
offstr = seg['name'].split('-segments/')[1].split('.')[0]
offset = int(offstr, 16)
seg['modified'] = False
self.manifest[object_name][offset] = seg
metadata = self._get_object_metadata(object_name)
self.manifest[object_name]['segments-dir'] = \
metadata['segments-dir']
try:
segment_dir = self._full_path(
self.manifest[object_name]['segments-dir'])
os.makedirs(segment_dir)
except BaseException:
pass
with open(full_path, "w") as f:
f.write(json.dumps(manifest))
else:
# this is either write or create request
if flags in (int('8001', 16), int('8801', 16), int('0001', 16)):
try:
self.object_unlink(object_name)
except BaseException:
pass
if flags & os.O_WRONLY:
with open(full_path, "w") as f:
pass
try:
segment_dir = self._full_path(object_name + "-segments")
os.makedirs(segment_dir)
except BaseException:
pass
return os.open(full_path, flags)
def object_close(self, object_name, fh):
try:
segments_dir = self.manifest[object_name].get(
'segments-dir', object_name + "-segments")
container, prefix = self.split_head_tail(segments_dir)
object_manifest = []
segments_list = {}
if not self.manifest[object_name]['readonly']:
offset = 0
while True:
if offset not in self.manifest[object_name]:
break
if self.manifest[object_name][offset]['modified']:
st = self.object_getattr(
self.manifest[object_name][offset]['name'], fh)
stat = munchify(st)
object_manifest.append(
{
"path": self.manifest[object_name][offset]['name'],
"etag": stat.etag,
"size_bytes": min(
stat.st_size,
CONF.vault_segment_size)})
else:
object_manifest.append(
{
"path": self.manifest[object_name][offset]['name'],
"etag": self.manifest[object_name][offset]['hash'],
"size_bytes": self.manifest[object_name][offset]['bytes']})
offset += CONF.vault_segment_size
object_manifest = json.dumps(object_manifest)
self._write_object_manifest(
object_name, object_manifest, metadata={
'segments-dir': segments_dir})
offset = 0
while True:
objects = self.segment_list(container, prefix, offset)
if len(objects) == 0:
break
objects = sorted(objects)
c, p = self.split_head_tail(
self.manifest[object_name][offset]['name'])
for obj in list(set(objects) - set([p])):
self.object_delete(os.path.join(container, obj))
offset += CONF.vault_segment_size
try:
os.close(fh)
except BaseException:
pass
try:
os.remove(self._full_path(object_name))
except BaseException:
pass
try:
shutil.rmtree(self._full_path(segments_dir))
except BaseException:
pass
return
except Exception as ex:
LOG.exception(ex)
pass
def object_upload(self, object_name, offset, buf):
if offset in self.manifest[object_name] and \
self.manifest[object_name][offset]['modified'] is True:
seg_fullname = self.manifest[object_name][offset]['name']
else:
segname = self.next_segname_from_offset(object_name, offset)
segments_dir = self.manifest[object_name].get(
'segments-dir', object_name + "-segments")
seg_fullname = os.path.join(segments_dir, segname)
container, obj = self.split_head_tail(seg_fullname)
cache_path = self._get_cache(seg_fullname)
with tmpfsfile() as tempfs:
with open(tempfs, "w") as f:
f.write(buf)
_opts = options.copy()
_opts['segment_size'] = len(buf)
_opts['object_name'] = obj.rstrip('/')
_opts = munchify(_opts)
args1 = [container, tempfs]
try:
vaultswift.st_upload(args1, _opts)
except Exception as ex:
LOG.exception(ex)
raise
if offset not in self.manifest[object_name]:
self.manifest[object_name][offset] = {}
self.manifest[object_name][offset]['name'] = seg_fullname
self.manifest[object_name][offset]['modified'] = True
@disable_logging
def object_download(self, object_name, offset):
if offset not in self.manifest[object_name]:
raise Exception(
'object %s not found' %
object_name +
SEGMENT_FORMAT %
(int(offset),
int(0)))
seg_fullname = self.manifest[object_name][offset]['name']
container, obj = self.split_head_tail(seg_fullname.rstrip('/'))
# make sure that tmpfs is mounted
cache_path = self._get_cache(seg_fullname)
try:
os.makedirs(self._full_path(segments_dir), mode=0o751)
except BaseException:
pass
with tmpfsfile(remove=True) as tempfs:
_opts = options.copy()
_opts['prefix'] = None
_opts['out_directory'] = None
_opts['out_file'] = tempfs.rstrip('/')
_opts = munchify(_opts)
args1 = [container, obj]
try:
vaultswift.st_download(args1, _opts)
with open(tempfs.rstrip('/'), "rb") as f:
buf = f.read()
return bytearray(buf)
except Exception as ex:
LOG.exception(ex)
raise
def object_delete(self, object_name):
container, obj = self.split_head_tail(object_name)
_opts = options.copy()
_opts = munchify(_opts)
args1 = [container]
if obj != '' and obj != '/':
args1.append(obj)
try:
vaultswift.st_delete(args1, _opts)
except Exception as ex:
LOG.exception(ex)
pass
def object_truncate(self, object_name, length, fh=None):
pass
def segment_list(self, container, segments_dir, offset):
_opts = options.copy()
_opts['delimiter'] = None
_opts['human'] = False
_opts['totals'] = False
_opts['long'] = False
_opts['prefix'] = os.path.join(segments_dir, "%016x" % int(offset))
args = []
if container == '':
args = []
else:
args = [container]
_opts = munchify(_opts)
return vaultswift.st_list(args, _opts)
def next_segname_from_offset(self, path, offset):
container, prefix = self.split_head_tail(path)
segments_dir = self.manifest[path].get('segments-dir', None)
if segments_dir:
c, p = self.split_head_tail(segments_dir)
segments_dir = p
else:
segments_dir = prefix + "-segments"
files = self.segment_list(container, segments_dir, offset)
if len(files) == 0:
return SEGMENT_FORMAT % (int(offset), int(0))
return SEGMENT_FORMAT % (int(offset), int(
sorted(files)[-1].split(segments_dir)[1].split('.')[1], 16) + 1)
def curr_segname_from_offset(self, path, offset):
container, prefix = self.split_head_tail(path)
if self.manifest.get(path, None) and \
offset in self.manifest[path]:
seg = self.manifest[path][offset]['name']
return seg.split('-segments/')[1]
segments_dir = self.manifest[path].get('segments-dir', None)
if segments_dir:
c, p = self.split_head_tail(segments_dir)
segments_dir = p
else:
segments_dir = prefix + "-segments"
files = self.segment_list(container, segments_dir, offset)
if len(files) == 0:
return SEGMENT_FORMAT % (int(offset), int(0))
return SEGMENT_FORMAT % (int(offset), int(
sorted(files)[-1].split(segments_dir)[1].split('.')[1], 16))
def object_getattr(self, object_name, fh=None):
full_path = self._full_path(object_name)
container, prefix = self.split_head_tail(object_name)
_opts = options.copy()
if container == '':
args = []
else:
args = [container]
_opts['delimiter'] = None
_opts['human'] = False
_opts['totals'] = False
_opts['long'] = False
_opts['prefix'] = None
_opts = munchify(_opts)
d = {}
if prefix != '':
_opts['prefix'] = prefix
args.append(prefix)
else:
prefix = None
try:
st = vaultswift.st_stat(args, _opts)
d['st_gid'] = self.group_id
d['st_uid'] = self.user_id
d['etag'] = st['headers'].get('etag', "")
d['st_atime'] = int(st['headers']['x-timestamp'].split('.')[1])
d['st_ctime'] = int(st['headers']['x-timestamp'].split('.')[0])
d['st_mtime'] = int(st['headers']['x-timestamp'].split('.')[0])
d['st_nlink'] = 1
d['st_mode'] = 33261
if prefix is not None and 'authorized_key' in prefix:
d['st_mode'] = 33152
if 'content-length' in st['headers']:
d['st_size'] = int(st['headers']['content-length'])
elif 'x-container-bytes-used' in st['headers'] and (prefix is None or prefix == '') and st['headers']['x-container-bytes-used'] <= 0:
d['st_size'] = int(st['headers']['x-container-bytes-used'])
elif container != '' and (prefix is None or prefix == ''):
d['st_size'] = 0
else:
d['st_size'] = 0
if (d['st_size'] == 0 and container == '') or (d['st_size'] == 0 and prefix is None) or \
(d['st_size'] == 0 and prefix == ''):
d['st_nlink'] = 3
d['st_size'] = 4096
d['st_mode'] = 16893
if not os.path.exists(self._get_cache(container)):
container_path = self._get_cache(container)
os.mkdir(container_path, 0o751)
except Exception as ex:
if prefix is None:
prefix = container
full_path = self._get_cache(os.path.join(container, prefix))
mkdirs = self._get_head(prefix)
try:
st = os.lstat(full_path)
except BaseException:
args1 = args
if len(args1) > 1:
args1.pop()
try:
_opts['prefix'] = os.path.join(_opts['prefix'], '')
st = vaultswift.st_list(args1, _opts)
if len(st) > 0:
os.mkdir(full_path, 0o751)
else:
os.mkdir(container, 0o751)
except BaseException:
pass
if prefix == '4913' or prefix[:-1].endswith('~'):
return
st = os.lstat(full_path)
d = dict(
(key,
getattr(
st,
key)) for key in (
'st_atime',
'st_ctime',
'st_gid',
'st_mode',
'st_mtime',
'st_nlink',
'st_size',
'st_uid',
))
# st_blksize and st_blocks are import for qemu-img info command to
# display disk size attribute correctly. Without this information
# it displays disk size 0
d['st_blksize'] = 512
d['st_blocks'] = d['st_size'] / 512
return d
def object_readdir(self, object_name, fh):
listing = []
container, prefix = self.split_head_tail(object_name)
_opts = options.copy()
_opts['delimiter'] = None
_opts['human'] = False
_opts['totals'] = False
_opts['long'] = False
_opts['prefix'] = None
args = []
if container == '':
args = []
else:
args = [container]
#_opts['delimiter'] = '/'
if prefix != '' and prefix is not None:
_opts['prefix'] = prefix + '/'
dirents = []
_opts = munchify(_opts)
listing += vaultswift.st_list(args, _opts)
for lst in listing:
if prefix and prefix not in lst:
continue
if prefix:
component, rest = self.split_head_tail(lst.split(prefix, 1)[1])
else:
component, rest = self.split_head_tail(lst)
if component != '' or rest != '':
mkdirs = os.path.join(container, self._get_head(lst))
try:
os.makedirs(mkdirs, mode=0o751)
except BaseException:
pass
if component is not None and component != '' and \
'-segments' not in component and '_segments' not in component:
if component not in dirents:
dirents.append(component)
for r in list(dirents):
yield r
def object_access(self, object_name, mode):
pass
def object_unlink(self, object_name, leave_segments=False):
container, obj = self.split_head_tail(object_name)
options['leave_segments'] = leave_segments
_opts = options.copy()
_opts = munchify(_opts)
args1 = [container]
if obj != '' and obj != '/':
args1.append(obj)
try:
vaultswift.st_delete(args1, _opts)
except Exception as ex:
LOG.exception(ex)
pass
def object_statfs(self, object_name):
_opts = options.copy()
_opts = munchify(_opts)
args1 = []
stv = vaultswift.st_stat(args1, _opts)
if 'x-account-meta-quota-bytes' in stv['headers']:
f_blocks = int(stv['headers']['x-account-meta-quota-bytes'])
f_bavail = int(stv['headers']['x-account-meta-quota-bytes']
) - int(stv['headers']['x-account-bytes-used'])
else:
f_blocks = -1
f_bavail = int(stv['headers']['x-account-bytes-used'])
dt = {}
dt['f_blocks'] = f_blocks
dt['f_bfree'] = f_bavail
dt['f_bavail'] = f_bavail
dt['f_favail'] = 0
dt['f_frsize'] = 0
return dt
def mkdir(self, path, mode, ist=False):
LOG.debug(("mkdir, %s" % path))
container, obj = self.split_head_tail(path)
if (obj == '' or obj == '/') and ist is False:
_opts = options.copy()
_opts = munchify(_opts)
args1 = [container]
try:
vaultswift.st_post(args1, _opts)
except Exception as ex:
LOG.exception(ex)
return 0
return 0
cache_path = self._get_cache(path)
if ist is False:
cache_path = self._get_cache(os.path.join(container, obj))
try:
os.makedirs(cache_path, mode)
except Exception as ex:
LOG.exception(ex)
return 0
def rmdir(self, path):
LOG.debug(("rmdir, %s" % path))
container, obj = self.split_head_tail(path)
_opts = options.copy()
_opts = munchify(_opts)
args1 = [container]
if obj != '' and obj != '/':
args1.append(obj)
try:
vaultswift.st_delete(args1, _opts)
except Exception as ex:
LOG.exception(ex)
pass
cache_path = self._get_cache(path)
return os.rmdir(cache_path)
def chmod(self, path, mode):
LOG.debug(("chmod, %s" % path))
try:
container, prefix = self.split_head_tail(path)
cache_path = self._get_cache(os.path.join(container, prefix))
return os.chmod(cache_path, mode)
except BaseException:
pass
def chown(self, path, uid, gid):
LOG.debug(("chown, %s" % path))
try:
container, prefix = self.split_head_tail(path)
cache_path = self._get_cache(os.path.join(container, prefix))
return os.chown(cache_path, uid, gid)
except BaseException:
pass
def symlink(self, name, target):
raise Exception("Not Applicable")
def rename(self, old, new):
LOG.debug(("rename, %s -> %s" % (old, new)))
# make a copy of the manifest
try:
manifest = self._read_object_manifest(old)
metadata = self._get_object_metadata(old)
segments_dir = metadata.get('segments-dir', old + '-segments')
manifest = json.loads(manifest)
new_manifest = []
for man in manifest:
new_manifest.append({"path": man['name'],
"etag": man['hash'],
"size_bytes": man['bytes']})
new_manifest = json.dumps(new_manifest)
self._write_object_manifest(
new, new_manifest, metadata={
'segments-dir': segments_dir})
self.object_unlink(old, leave_segments=True)
except BaseException:
self.object_unlink(new, leave_segments=True)
return 0
def link(self, target, name):
LOG.debug(("link, %s" % target))
container, prefix = split_head_tail(target)
cache_path_target = self._get_cache(os.path.join(container, prefix))
container, prefix = split_head_tail(name)
cache_path_name = self._get_cache(os.path.join(container, prefix))
return os.link(cache_path_target, cache_path_name)
def utimens(self, path, times=None):
LOG.debug(("utimens, %s" % path))
container, prefix = split_head_tail(path)
cache_path = self._get_cache(path)
""" This file won't be existing until not downloaded for writing/reading"""
try:
os.utime(cache_path, times)
except BaseException:
return 0
return 0
def destroy(self, path):
try:
tmpfs_mountpath = os.path.join(CONF.vault_data_directory_old,
CONF.tmpfs_mount_path)
if os.path.isdir(tmpfs_mountpath) and \
os.path.ismount(tmpfs_mountpath):
command = ['timeout', '-sKILL', '30', 'sudo', 'umount',
tmpfs_mountpath]
subprocess.check_call(command, shell=False)
except BaseException:
pass
if vaultswift.swift_list:
vaultswift.swift_list.__exit__(None, None, None)
if vaultswift.swift_stat:
vaultswift.swift_stat.__exit__(None, None, None)
if vaultswift.swift_upload:
vaultswift.swift_upload.__exit__(None, None, None)
if vaultswift.swift_download:
vaultswift.swift_download.__exit__(None, None, None)
if vaultswift.swift_delete:
vaultswift.swift_delete.__exit__(None, None, None)
if vaultswift.swift_post:
vaultswift.swift_post.__exit__(None, None, None)
if vaultswift.swift_cap:
vaultswift.swift_cap.__exit__(None, None, None)
shutil.rmtree(self.root)
return 0
class FileRepository(ObjectRepository):
def __init__(self, root, **kwargs):
super(FileRepository, self).__init__(root, **kwargs)
def object_open(self, object_name, flags):
if flags & (os.O_CREAT | os.O_WRONLY):
self.object_delete(object_name)
try:
segment_dir = self._full_path(object_name) + "-segments"
os.makedirs(segment_dir)
except BaseException:
pass
full_path = self._full_path(object_name)
return os.open(full_path, flags)
def object_upload(self, object_name, off, buf):
# always bump current version
segname = self.next_segname_from_offset(object_name, off)
segment_dir = self._full_path(object_name + "-segments")
seg_fullname = os.path.join(segment_dir, segname)
with open(seg_fullname, "w") as segf:
segf.write(buf)
def object_download(self, object_name, offset):
segment_dir = self._full_path(object_name + "-segments")
segname = self.curr_segname_from_offset(object_name, offset)
object_name = os.path.join(segment_dir, segname)
with open(object_name, "r") as segf:
return bytearray(segf.read())
def object_delete(self, object_name):
manifest_filename = self._full_path(object_name) + ".manifest"
try:
shutil.rmtree(manifest_filename.split(
".manifest")[0] + "-segments")
os.remove(manifest_filename)
except BaseException:
pass
def object_close(self, object_name, fh):
full_path = self._full_path(object_name)
manifest = full_path + ".manifest"
segment_dir = self._full_path(object_name + "-segments")
object_manifest = []
segments_list = {}
offset = 0
while True:
segments_list[offset] = objects = self.segment_list(
segment_dir, offset)
if len(objects) == 0:
break
objects = sorted(objects)
stat = os.stat(objects[-1])
object_manifest.append({"path": objects[-1],
"etag": "etagoftheobjectsegment1",
"size_bytes": min(stat.st_size,
CONF.vault_segment_size)})
offset += CONF.vault_segment_size
with open(manifest, "w") as manf:
manf.write(json.dumps(object_manifest))
offset = 0
while True:
objects = segments_list[offset]
if len(objects) == 0:
break
objects = sorted(objects)
for obj in list(set(objects) - set([objects[-1]])):
os.remove(obj)
offset += CONF.vault_segment_size
os.close(fh)
return
def object_truncate(self, object_name, length, fh=None):
full_path = self._full_path(object_name)
with open(full_path, 'r+') as f:
f.truncate(length)
def segment_list(self, segment_dir, offset):
return glob.glob(os.path.join(
segment_dir, '%016x.[0-9a-f]*' % int(offset)))
def next_segname_from_offset(self, path, offset):
segment_dir = self._full_path(path) + "-segments"
files = self.segment_list(segment_dir, offset)
if len(files) == 0:
return SEGMENT_FORMAT % (int(offset), int(0))
else:
manifestfile = self._full_path(path) + ".manifest"
with open(manifestfile, "r") as manf:
manifest = json.load(manf)
for obj in manifest:
segnum = '/%016x' % offset
if segnum in obj['path']:
segname = obj['path'].split(segment_dir)[1]
version = int(segname.split('.')[1], 16) + 1
return SEGMENT_FORMAT % (int(offset), int(version))
return SEGMENT_FORMAT % (int(offset), int(
sorted(files)[-1].split(segment_dir)[1].split('.')[1], 16) + 1)
def curr_segname_from_offset(self, path, offset, forread=False):
segment_dir = self._full_path(path) + "-segments"
files = self.segment_list(segment_dir, offset)
if len(files) == 0:
if forread:
raise Exception("Segment does not exists")
return SEGMENT_FORMAT % (int(offset), int(0))
else:
manifestfile = self._full_path(path) + ".manifest"
with open(manifestfile, "r") as manf:
manifest = json.load(manf)
for obj in manifest:
segnum = '/%016x' % offset
if segnum in obj['path']:
segname = obj['path'].split(segment_dir)[1]
version = int(segname.split('.')[1], 16)
return SEGMENT_FORMAT % (int(offset), int(version))
return SEGMENT_FORMAT % (int(offset), int(
sorted(files)[-1].split(segment_dir)[1].split('.')[1], 16))
def object_getattr(self, object_name, fh=None):
full_path = self._full_path(object_name)
st = os.lstat(full_path)
attrs = dict(
(key,
getattr(
st,
key)) for key in (
'st_atime',
'st_ctime',
'st_gid',
'st_mode',
'st_mtime',
'st_nlink',
'st_size',
'st_uid'))
try:
with open(full_path + ".manifest") as manf:
attrs['st_size'] = 0
for seg in json.load(manf):
attrs['st_size'] += seg['size_bytes']
except BaseException:
pass
return attrs
def object_readdir(self, path, fh):
full_path = self._full_path(path)
dirents = []
if os.path.isdir(full_path):
listing = []
for d in os.listdir(full_path):
if ".manifest" in d:
listing.append(d.split(".manifest")[0])
dirents.extend(listing)
return dirents
def object_access(self, path, mode):
full_path = self._full_path(path)
if not os.access(full_path, mode):
raise FuseOSError(errno.EACCES)
def object_unlink(self, path):
return self.object_delete(path)
def object_statfs(self, path):
full_path = self._full_path(path)
stv = os.statvfs(full_path)
return dict(
(key,
getattr(
stv,
key)) for key in (
'f_bavail',
'f_bfree',
'f_blocks',
'f_bsize',
'f_favail',
'f_ffree',
'f_files',
'f_flag',
'f_frsize',
'f_namemax'))
def destroy(self, path):
shutil.rmtree(self.root)
return 0
class FuseCache(object):
def __init__(self, root, repository):
global lrucache
self.root = root
self.repository = repository
self.lrucache = lrucache
def object_open(self, object_name, flags):
fh = self.repository.object_open(object_name, flags)
if self.lrucache.get(fh, None):
self.lrucache.pop(fh)
self.lrucache[fh] = {
'lrucache': LRUCache(
maxsize=CONF.vault_cache_size),
'object_name': object_name}
return fh
def object_flush(self, object_name, fh):
item = self.lrucache[fh]
assert item['object_name'] == object_name
cache = item['lrucache']
try:
while True:
off, item = cache.popitem()
if item['modified']:
self.repository.object_upload(
object_name, off, item['data'])
except BaseException:
pass
def object_close(self, object_name, fh):
self.object_flush(object_name, fh)
self.repository.object_close(object_name, fh)
self.lrucache.pop(fh)
return
def object_truncate(self, object_name, length, fh=None):
self.repository.object_truncate(object_name, length, fh)
def _walk_segments(self, offset, length):
while length != 0:
seg_offset = offset / CONF.vault_segment_size * CONF.vault_segment_size
base = offset - seg_offset
seg_len = min(length, CONF.vault_segment_size - base)
yield seg_offset, base, seg_len
offset += seg_len
length -= seg_len
def object_read(self, object_name, length, offset, fh):
# if len(cache) == CONF.vault_cache_size:
#off, item = cache.popitem()
#segname = next_segname_from_offset(off)
#object_name = os.path.join(SEGMENT_DIR, segname)
# if modified upload to object store. We assume
# it is not modified for now
#object_upload(object_name, item)
assert self.lrucache[fh]['object_name'] == object_name
output_buf = bytearray()
for segoffset, base, seg_len in self._walk_segments(offset, length):
try:
segdata = self.lrucache[fh]['lrucache'][segoffset]['data']
except BaseException:
try:
# cache miss. free up a cache slot
cache = self.lrucache[fh]['lrucache']
if len(cache) == CONF.vault_cache_size:
# cache overflow
# kick an item so we can accomodate new one
off, item = cache.popitem()
if item['modified']:
self.repository.object_upload(
object_name, off, item['data'])
segdata = self.repository.object_download(
object_name, segoffset)
self.lrucache[fh]['lrucache'][segoffset] = {
'modified': False, 'data': segdata}
except BaseException:
# end of file
return 0
output_buf += segdata[base:base + seg_len]
return str(output_buf)
def object_write(self, object_name, buf, offset, fh):
length = len(buf)
assert self.lrucache[fh]['object_name'] == object_name
bufptr = 0
for segoffset, base, seg_len in self._walk_segments(offset, length):
cache = self.lrucache[fh]['lrucache']
if segoffset not in cache:
if len(cache) == CONF.vault_cache_size:
# cache overflow
# kick an item so we can accomodate new one
off, item = cache.popitem()
if item['modified']:
self.repository.object_upload(
object_name, off, item['data'])
# we need to handle offset that is not object segment boundary
# read the segment before modifying it
try:
# populate cache
segdata = self.repository.object_download(
object_name, segoffset)
if segdata is None:
raise Exception("Object not found")
cache[segoffset] = {'modified': False, 'data': segdata}
except BaseException:
# we have not written the segment to the repository yet
segdata = bytearray(1)
cache[segoffset] = {'modified': True, 'data': segdata}
else:
segdata = cache[segoffset]['data']
if len(segdata) < base:
segdata.extend('\0' * (base + seg_len - len(segdata)))
segdata[base:base + seg_len] = buf[bufptr:bufptr + seg_len]
cache[segoffset]['modified'] = True
bufptr += seg_len
return len(buf)
def mkdir(self, path, mode):
return os.mkdir(self._full_path(path), mode)
def rmdir(self, path):
full_path = self._full_path(path)
return os.rmdir(full_path)
def chmod(self, path, mode):
full_path = self._full_path(path)
return os.chmod(full_path, mode)
def chown(self, path, uid, gid):
full_path = self._full_path(path)
return os.chown(full_path, uid, gid)
class TrilioVault(Operations):
def __init__(self, root, repository=None):
self.root = root
self.repository = repository or SwiftRepository(root)
self.cache = FuseCache(root, self.repository)
# Filesystem methods
# ==================
def access(self, path, mode):
self.repository.object_access(path, mode)
def chmod(self, path, mode):
return self.repository.chmod(path, mode)
def chown(self, path, uid, gid):
return self.repository.chown(path, uid, gid)
@disable_logging
def getattr(self, path, fh=None):
return self.repository.object_getattr(path, fh)
def readdir(self, path, fh):
dirents = ['.', '..']
dirents.extend(self.repository.object_readdir(path, fh))
for r in dirents:
yield r
def readlink(self, path):
pathname = os.readlink(self._full_path(path))
if pathname.startswith("/"):
# Path name is absolute, sanitize it.
return os.path.relpath(pathname, self.root)
else:
return pathname
def mknod(self, path, mode, dev):
return os.mknod(self._full_path(path), mode, dev)
def rmdir(self, path):
return self.repository.rmdir(path)
def mkdir(self, path, mode):
return self.repository.mkdir(path, mode)
def statfs(self, path):
return self.repository.object_statfs(path)
def unlink(self, path):
return self.repository.object_unlink(path)
def symlink(self, name, target):
return self.repository.symlink(name, target)
def rename(self, old, new):
return self.repository.rename(old, new)
def link(self, target, name):
return self.repository.link(target, name)
def utimens(self, path, times=None):
return self.repository.utimens(path, times)
# File methods
# ============
def open(self, path, mode):
return self.cache.object_open(path, mode)
def create(self, path, mode, fi=None):
return self.open(path, os.O_CREAT)
def read(self, path, length, offset, fh):
buf = self.cache.object_read(path, length, offset, fh)
return buf
def write(self, path, buf, offset, fh):
return self.cache.object_write(path, buf, offset, fh)
def truncate(self, path, length, fh=None):
self.cache.object_truncate(path, length, fh=fh)
def flush(self, path, fh):
self.cache.object_flush(path, fh)
def release(self, path, fh):
self.cache.object_close(path, fh)
return
def fsync(self, path, fdatasync, fh):
return self.cache.object_flush(path, fh)
def destroy(self, path):
LOG.debug("destroy, %s" % path)
return self.repository.destroy(path)
def fuse_conf():
with open('/etc/fuse.conf', 'r') as f:
found = 0
for line in f:
if 'user_allow_other' in line and '#user_allow_other' not in line:
found = 1
break
if found == 0:
with open('/etc/fuse.conf', 'rt') as f:
s = f.read() + '\n' + 'user_allow_other \n'
with open('/tmp/fuse.conf.tmp', 'wt') as outf:
outf.write(s)
subprocess.call([shutil.which('sudo'), shutil.which('mv'), '/tmp/fuse.conf.tmp', '/etc/fuse.conf'])
subprocess.call([shutil.which('sudo'), shutil.which('chown'), 'root:root', '/etc/fuse.conf'])
def main(mountpoint, cacheroot):
try:
try:
command = ['sudo', shutil.which('umount'), '-l', mountpoint]
subprocess.call(command, shell=False)
except BaseException:
pass
if os.path.isdir(mountpoint):
perm = FUSE_USER + ':' + FUSE_USER
subprocess.call([shutil.which('sudo'), 'chown', '-R', perm, mountpoint])
else:
command = ['sudo', 'mkdir', mountpoint]
subprocess.call(command, shell=False)
command = [
'sudo',
shutil.which('chown'),
FUSE_USER +
':' +
FUSE_USER,
mountpoint]
subprocess.call(command, shell=False)
except Exception as ex:
LOG.exception(ex)
tvaultplugin = TrilioVault(cacheroot,
repository=SwiftRepository(cacheroot))
FUSE(tvaultplugin, mountpoint,
nothreads=True, foreground=True, nonempty=True,
big_writes=True, direct_io=True, allow_other=True)
if __name__ == '__main__':
fuse_conf()
main(CONF.vault_data_directory, CONF.vault_data_directory_old)