Repository URL to install this package:
Version:
5.2.5 ▾
|
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2018 Trilio Data, Inc.
# All Rights Reserved.
""" Trilio Fuse plugin implimentation
This module is based on the vaultfuse.py module and will eventually
become the new vaultfuse.py module once integration and refactoring
of the existing vaultswift.py is complete.
Currently this module is temporary for the 2.6 release and will be merged
into a new vaultfuse.py for the next release.
"""
import os
import grp
import sys
import errno
import time
import getpass
from datetime import datetime
import copy
import json
import shutil
import functools
import threading
from threading import Thread
from pwd import getpwnam
# Import the correct thread safe queue version
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
from cachetools import LRUCache
try:
from fuse import FUSE, FuseOSError, Operations
except BaseException:
# Debian has fuespy module instead of fuse
from fusepy import FUSE, FuseOSError, Operations
from botocore.exceptions import ClientError
from s3fuse.bunch_p3 import bunchify
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
from oslo_log import log as logging
from s3fuse.utils import get_ssl_cert_path
from s3fuse import vaults3
from s3fuse import config_parser
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
_SEGMENT_PREFIX = "80bc80ff-0c51-4534-86a2-ec5e719643c2"
QUEUE_DEPTH = 100
WORKER_POOL_SIZE = 5
common_cli_opts = [
cfg.BoolOpt(
'debug',
short='d',
default=False,
help='Print debugging output (set logging level to '
'DEBUG instead of default WARNING level).',
),
cfg.BoolOpt(
'verbose',
short='v',
default=False,
help='Print more verbose output (set logging level to '
'INFO instead of default WARNING level).',
),
]
logging_cli_opts = [
cfg.StrOpt(
'log-config',
metavar='PATH',
help='If this option is specified, the logging configuration '
'file specified is used and overrides any other logging '
'options specified. Please see the Python logging module '
'documentation for details on logging configuration '
'files.',
),
cfg.StrOpt(
'log-config-append', metavar='PATH', help='(Optional) Log Append'
),
cfg.StrOpt('watch-log-file', metavar='PATH', help='(Optional) Watch log'),
cfg.StrOpt(
'log-format',
default=None,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'This option is deprecated. Please use '
'logging_context_format_string and '
'logging_default_format_string instead.',
),
cfg.StrOpt(
'log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
help='Format string for %%(asctime)s in log records. '
'Default: %(default)s',
),
cfg.StrOpt(
'log-file',
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If no default is set, logging will go to stdout.',
),
cfg.StrOpt(
'log-dir',
deprecated_name='logdir',
help='(Optional) The base directory used for relative '
'--log-file paths',
),
cfg.BoolOpt('use-syslog', default=False, help='Use syslog for logging.'),
cfg.StrOpt(
'syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines',
),
]
generic_log_opts = [
cfg.BoolOpt(
'use_stderr', default=True, help='Log output to standard error'
),
cfg.BoolOpt(
'use_journal',
default=False,
help='Enable Systemd native journal support',
),
cfg.BoolOpt(
'use_eventlog', default=False, help='Log output to Windows Event Log.'
),
cfg.BoolOpt(
'use_json',
default=False,
help='Enables JSON formatting in the logs when set to True',
),
cfg.IntOpt('rate_limit_burst', default=1, help='Burst limit'),
]
log_opts = [
cfg.IntOpt(
'log_rotate_interval',
default=12,
help='Interval, number of hours, of log rate limiting.',
),
cfg.IntOpt(
'max_logfile_count',
default=60,
help='Number of log files to retain.',
),
cfg.StrOpt(
'log_rotate_interval_type',
default='Hours',
help='Units of log rotation interval',
),
cfg.StrOpt(
'log_rotation_type',
default='interval',
help='type of log retention',
),
cfg.StrOpt(
'logging_context_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [%(request_id)s %(user)s %(tenant)s] '
'%(instance)s%(message)s',
help='format string to use for log messages with context',
),
cfg.StrOpt(
'logging_default_format_string',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context',
),
cfg.StrOpt(
'logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG',
),
cfg.StrOpt(
'logging_exception_prefix',
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format',
),
cfg.ListOpt(
'default_log_levels',
default=[
'amqplib=WARN',
'sqlalchemy=WARN',
'boto=WARN',
'suds=INFO',
'keystone=INFO',
'eventlet.wsgi.server=WARN',
],
help='list of logger=LEVEL pairs',
),
cfg.BoolOpt('publish_errors', default=False, help='publish error events'),
cfg.BoolOpt(
'fatal_deprecations', default=False, help='make deprecations fatal'
),
cfg.StrOpt(
'instance_format',
default='[instance: %(uuid)s] ',
help='If an instance is passed with the log message, format '
'it like this',
),
cfg.StrOpt(
'instance_uuid_format',
default='[instance: %(uuid)s] ',
help='If an instance UUID is passed with the log message, '
'format it like this',
),
cfg.IntOpt(
'rate_limit_interval',
default=0,
help='Interval, number of seconds, of log rate limiting.',
),
]
vault_opts = [
cfg.StrOpt('vault_storage_type',
default='s3',
help='Storage type: nfs, swift-i, swift-s, s3'),
cfg.StrOpt('vault_data_directory',
help='Location where snapshots will be stored'),
cfg.StrOpt('vault_data_directory_old',
default='/var/triliovault',
help='Location where snapshots will be stored'),
cfg.IntOpt('queue_depth',
default=QUEUE_DEPTH,
help='The number of writes to be queued (writeback)',),
cfg.IntOpt('worker_pool_size',
default=WORKER_POOL_SIZE,
help='Number of worker threads to process the write queue',),
cfg.IntOpt('vault_retry_count',
default=2,
help='The number of times we retry on failures'),
cfg.IntOpt('vault_segment_size',
default=32 * 1024 * 1024,
help='vault object segmentation size'),
cfg.IntOpt('vault_cache_size',
default=5,
help='Number of segments of an object that need to be cached'),
cfg.StrOpt('rootwrap_conf',
default='/etc/nova/rootwrap.conf',
metavar='PATH',
help='rootwrap config file'),
cfg.StrOpt('vault_s3_auth_version',
default='DEFAULT',
help='S3 Authentication type'),
cfg.StrOpt('vault_s3_access_key_id',
default='',
help='S3 Key ID'),
cfg.StrOpt('vault_s3_secret_access_key',
default='',
help='S3 Secret Access Key'),
cfg.StrOpt('vault_s3_region_name',
default='',
help='S3 Region'),
cfg.StrOpt('vault_s3_bucket',
default='',
help='S3 Bucket'),
cfg.StrOpt('vault_s3_endpoint_url',
default='',
help='S3 Endpoint URL'),
cfg.StrOpt('vault_s3_ssl',
default='False',
help='Enable SSL certificate'),
cfg.StrOpt('vault_s3_ssl_cert',
default=get_ssl_cert_path(),
help='Use SSL certificate bundle'),
cfg.StrOpt('vault_s3_signature_version',
default='default',
help='S3 signature version to use'),
cfg.StrOpt('vault_s3_read_timeout',
default='30',
help='Time in seconds to wait for a response to API calls'),
cfg.StrOpt('vault_enable_threadpool',
default='True',
help='Enable backend thread pool'),
cfg.StrOpt('vault_threaded_filesystem',
default='True',
help='Allow multiple file system threads'),
cfg.StrOpt('max_uploads_pending',
default='20',
help='Number of file uploads.'),
cfg.StrOpt('vault_cache_username',
default='nova',
help='System username.'),
cfg.StrOpt('vault_logging_level',
default='error',
help='Logging level filter (debug, info, warn, error).'),
cfg.StrOpt('vault_s3_max_pool_connections',
default='500',
help='The maximum number of connections to keep '
'in a connection pool'),
cfg.BoolOpt('bucket_object_lock',
default=False,
help='S3 bucket object locking is enabled'),
cfg.BoolOpt('use_manifest_suffix',
default=False,
help='To preserve backward compatibility, set this to False.'
'When set to False, s3fuse creates manifest file names'
'without suffix'),
]
CONF = cfg.CONF
CONF.register_opts(vault_opts)
CONF.register_cli_opts(logging_cli_opts)
CONF.register_cli_opts(generic_log_opts)
CONF.register_cli_opts(log_opts)
CONF.register_cli_opts(common_cli_opts)
# import env variable and pass it
try:
flag = str(os.environ['FLAG'])
CONF([flag])
except BaseException:
CONF(sys.argv[1:])
if 's3fuse_sys_admin' in CONF.list_all_sections() or \
's3fuse_sys_admin' in config_parser.sections():
from s3fuse.privsep import fs
else:
from s3fuse import utils as fs
logging.setup(cfg.CONF, CONF.vault_storage_type.lower())
LOG = logging.getLogger(CONF.vault_storage_type.lower())
try:
# Logging level filters from most o least verbose.
if CONF.vault_logging_level.lower() == 'debug':
LOG.logger.setLevel(logging.DEBUG)
elif CONF.vault_logging_level.lower() == 'info':
LOG.logger.setLevel(logging.INFO)
elif CONF.vault_logging_level.lower() == 'warn':
LOG.logger.setLevel(logging.WARN)
else:
LOG.logger.setLevel(logging.ERROR)
except BaseException:
LOG.logger.setLevel(logging.logging.ERROR)
options = {'sync_to': None, 'verbose': 1, 'header': [], 'auth_version': '1.0',
'os_options': {'project_name': None,
'region_name': None,
'user_domain_name': None,
'endpoint_type': None,
'object_storage_url': None,
'project_domain_id': None,
'user_id': None,
'user_domain_id': None,
'tenant_id': None,
'service_type': None,
'project_id': None,
'auth_token': None,
'project_domain_name': None
},
'ssl_compression': True,
'os_storage_url': None,
'os_username': '',
'os_password': '',
'os_cacert': os.environ.get('OS_CACERT'),
'os_cert': os.environ.get('OS_CERT'),
'os_key': os.environ.get('OS_KEY'),
'os_tenant_name': '',
'os_auth_url': '',
'os_auth_token': None,
'insecure': True,
'snet': False, 'sync_key': None, 'auth': '',
'user': '', 'key': '',
'read_acl': None, 'info': False,
'retries': 5, 'write_acl': None, 'meta': [],
'debug': False, 'use_slo': False,
'checksum': True, 'changed': False,
'leave_segments': False, 'skip_identical': True,
'segment_threads': 10,
'object_dd_threads': 10, 'object_uu_threads': 10,
'container_threads': 10,
'yes_all': False, 'object_name': None,
}
def option_config():
assert CONF.vault_storage_type.lower() == 's3'
assert CONF.vault_s3_auth_version == 'DEFAULT'
options['auth_version'] = '1.0'
options['user'] = CONF.vault_s3_access_key_id
options['key'] = CONF.vault_s3_secret_access_key
options['bucket'] = CONF.vault_s3_bucket
options['s3_signature'] = CONF.vault_s3_signature_version
options['s3_read_timeout'] = CONF.vault_s3_read_timeout
options['vault_s3_max_pool_connections'] = \
CONF.vault_s3_max_pool_connections
options['bucket_object_lock'] = CONF.bucket_object_lock
options['use_manifest_suffix'] = CONF.use_manifest_suffix
options['s3_ssl_cert'] = ''
options['retention_mode'] = 'GOVERNANCE'
if CONF.vault_s3_endpoint_url:
options['os_options']['object_storage_url'] = \
CONF.vault_s3_endpoint_url
if CONF.vault_s3_region_name:
options['os_options']['region_name'] = CONF.vault_s3_region_name
if CONF.vault_s3_ssl.lower() == 'false':
LOG.warn(
'Invalid S3 SSL cert file path: %s'
'Using Default path, set the value to False '
'for insecure connection'
% CONF.vault_s3_ssl_cert
)
options['s3_ssl_cert'] = ''
elif CONF.vault_s3_ssl.lower() == 'true' and \
os.path.exists(CONF.vault_s3_ssl_cert):
options['s3_ssl_cert'] = CONF.vault_s3_ssl_cert
CACHE_LOW_WATERMARK = 10
CACHE_HIGH_WATERMARK = 20
# getpass.getuser will assign the hostname
FUSE_USER = getpass.getuser()
FUSE_GROUP = grp.getgrgid(os.getgid()).gr_name
SEGMENT_FORMAT = "%016x.%08x"
NUMBER_UPLOADS_BEFORE_FLUSH = 1000
def disable_logging(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
logging.logging.disable(logging.ERROR)
except BaseException:
logging.logging.disable(logging.logging.ERROR)
result = func(*args, **kwargs)
try:
logging.logging.disable(logging.NOTSET)
except BaseException:
logging.logging.disable(logging.logging.NOTSET)
return result
return wrapper
def split_head_tail(path):
head, tail = os.path.split(path)
prefix = ''
while head not in ('', '/'):
if prefix != '':
prefix = os.path.join(tail, prefix)
else:
prefix = tail
head, tail = os.path.split(head)
return tail, prefix
def get_head(path):
head, tail = os.path.split(path)
return head
class ObjectRepository(object):
def __init__(self, root, **kwargs):
self.root = root
pass
def _full_segment_path(self, partial):
# Normalize the string
if partial.startswith("/"):
partial = partial[1:]
path = os.path.join(_SEGMENT_PREFIX, partial)
return self._full_path(path)
def _full_path(self, partial):
if partial.startswith("/"):
partial = partial[1:]
path = os.path.join(self.root, partial)
return path
def object_open(self, object_name, flags):
pass
def object_upload(self, object_name, off, buf, fh):
pass
def object_download(self, object_name, offset, fh):
pass
def object_delete(self, object_name):
pass
def object_close(self, object_name, fh):
pass
def object_truncate(self, object_name, length, fh=None):
pass
def object_getattr(self, object_name, fh=None):
pass
def object_readdir(self, path, fh):
pass
def object_access(self, path, mode):
pass
def object_unlink(self, path):
pass
def object_statfs(self, path):
pass
class BackendRepository(ObjectRepository):
def __init__(self, root, **kwargs):
super(BackendRepository, self).__init__(root, **kwargs)
self.user_id = getpwnam(FUSE_USER).pw_uid
self.group_id = getpwnam(FUSE_USER).pw_gid
self.manifest = {}
# In the future (post 2.6), this will be the only place that needs to
# updated for each backend. S3/Swift/future plugins. All the
# rest of the code should be common.
# NOTE - For now, we also need to do this when we spawn threads.
self.__backend = vaults3.S3Backend(options)
# Threadsafe lock used when updating the manifest.
# Currently it is a recusive lock, but we should change this to a
# regular lock as part of the future refactoring. RLocks are slightly
# slower.
self.__manifest_lock = {}
# Locking order is _manifest_lock -> __open_close_lock
self.__open_close_lock = threading.Lock()
self.__in_open = {}
self.__worker_pool = {}
self.__pending_ios = {}
# Create a pool of threads to perform any tasks we might need
# to speed up. # Currently, uploads and manifest uploads are
# performed by the threads. The thread pool should not be larger
# than the number of cache elements.
self.__object_open_backend_pool = self.BackendWorkerPool(1, options)
class BackendWorker(Thread):
""" A thread used to asyncronously perform backend jobs placed in the job_queue
"""
def __init__(self, job_queue, options):
Thread.__init__(self)
# Create an instance of the backend for this thread.
# The S3 Boto3 API might not be 100% thread safe.
self.__backend = vaults3.S3Backend(options)
self.__job_queue = job_queue
self._stop_event = threading.Event()
self.daemon = True
self.start()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
""" Start up the worker and block until there is an item in the queue
"""
LOG.info(
'Starting worker thread[%x].'
% threading.current_thread().ident
)
while not self.stopped():
try:
func, args, kargs = self.__job_queue.get(True, 1)
try:
func(self.__backend, *args, **kargs)
finally:
# Call task_done() in order to inform the
# queue that this task is complete.
self.__job_queue.task_done()
except Empty:
continue
except Exception as e:
LOG.exception(e)
class BackendWorkerPool:
""" Pool of backend worker threads that consume selected tasks from a job queue
"""
def __init__(self, num_threads, options):
self.job_queue = Queue(CONF.queue_depth)
self.workers = []
for _ in range(num_threads):
self.workers.append(
BackendRepository.BackendWorker(self.job_queue, options)
)
def __del__(self):
for t in self.workers:
t.stop()
for t in self.workers:
t.join()
def add_job(self, func, *args, **kargs):
""" Add a job to the worker queue
"""
self.job_queue.put((func, args, kargs))
def map(self, func, args_list):
""" Add a list of jobs to the worker queue
"""
for args in args_list:
self.add_job(func, args)
def wait_completion(self):
""" Wait for completion of all the jobs in the queue
"""
self.job_queue.join()
def split_head_tail(self, path):
head, tail = os.path.split(path)
prefix = ''
while head not in ('', '/'):
if prefix != '':
prefix = os.path.join(tail, prefix)
else:
prefix = tail
head, tail = os.path.split(head)
return tail, prefix
def _get_head(self, path):
head, tail = os.path.split(path)
return head
def _get_segment_cache(self, partial):
if partial.startswith("/"):
partial = partial[1:]
path = os.path.join(_SEGMENT_PREFIX, partial)
return self._get_cache(path)
def _get_cache(self, partial):
if partial.startswith("/"):
partial = partial[1:]
path = os.path.join(self.root, partial)
return path
def _read_object_manifest(self, object_name):
container, prefix = self.split_head_tail(object_name)
_opts = options.copy()
_opts['object_name'] = prefix
_opts = bunchify(_opts)
manifest = self.__backend.get_object_manifest(object_name)
return manifest
def _write_object_manifest(
self, object_name, object_manifest, metadata={}
):
container, prefix = self.split_head_tail(object_name)
put_headers = {}
put_headers['x-static-large-object'] = 'true'
_opts = options.copy()
_opts['object_name'] = prefix
for key, value in metadata.items():
put_headers['x-object-meta-' + key] = value
_opts = bunchify(_opts)
self.__backend.upload_object_manifest(
object_name, put_headers, object_manifest
)
return
def _create_segments_folder(self, object_name):
LOG.debug(("mkdir, %s" % object_name))
_opts = options.copy()
_opts = bunchify(_opts)
args = [_SEGMENT_PREFIX, object_name.lstrip("/") + "-segments"]
self.__backend.mkdir_object(args, _opts)
def _get_object_metadata(self, object_name):
container, prefix = self.split_head_tail(object_name)
_opts = options.copy()
if container == '':
args = []
else:
args = [container]
_opts['delimiter'] = None
_opts['human'] = False
_opts['totals'] = False
_opts['long'] = False
_opts['prefix'] = None
_opts = bunchify(_opts)
if prefix != '':
_opts['prefix'] = prefix
args.append(prefix)
else:
prefix = None
st = self.__backend.stat_object(args, _opts)
metadata = {}
for key, value in st['headers']['Metadata'].items():
if 'x-object-meta-' in key:
metadata[key.split('x-object-meta-')[1]] = value
else:
metadata[key] = value
return metadata
"""
``r'' Open text file for reading. The stream is positioned at the
beginning of the file. (flags == 8000 or 8800)
``r+'' Open for reading and writing. The stream is positioned at the
beginning of the file. (flag 8002)
``w'' Truncate file to zero length or create text file for writing.
The stream is positioned at the beginning of the file.
(flags == 8001 or 8801)
``w+'' Open for reading and writing. The file is created if it does not
exist, otherwise it is truncated. The stream is positioned at
the beginning of the file. (flags == 8002 or 8802)
``a'' Open for writing. The file is created if it does not exist. The
stream is positioned at the end of the file. Subsequent writes
to the file will always end up at the then current end of file,
irrespective of any intervening fseek(3) or similar.
(flags == 8401)
``a+'' Open for reading and writing. The file is created if it does not
exist. The stream is positioned at the end of the file. Subse-
quent writes to the file will always end up at the then current
end of file, irrespective of any intervening fseek(3) or similar.
(flags == 8402)
"""
def object_open(self, object_name, flags):
container, prefix = self.split_head_tail(object_name)
full_path = self._full_path(object_name)
readonly = (
(flags & int('ff', 16)) == os.O_RDONLY
)
with self.__open_close_lock:
self.__manifest_lock.setdefault(
object_name, threading.RLock()
)
self.__in_open.setdefault(object_name, 0)
self.__in_open[object_name] += 1
with self.__manifest_lock[object_name]:
self.__worker_pool.setdefault(object_name, {})
self.__pending_ios.setdefault(object_name, {})
if object_name not in self.manifest:
# since there is no way we can differentiate between
# r+ and w+, we only implement w+ funcionality for
# both r+ and w+
if (
(flags & int('ff', 16)) == os.O_RDONLY
or (flags & os.O_RDWR) == os.O_RDWR
or (flags & os.O_APPEND) == os.O_APPEND
):
# load manifest
try:
manifest = self._read_object_manifest(object_name)
except Exception:
LOG.info(
'Manifest for %s not found. Trying in 5 secs'
% object_name
)
time.sleep(
5
) # Give some time for s3 to settle down and try again
manifest = self._read_object_manifest(object_name)
# manifest = json.loads(manifest)
self.manifest[object_name] = {'open_handles': {}}
for seg in manifest:
offstr = seg['name'].split('-segments/')[1]
offstr = offstr.split('.')[0]
offset = int(offstr, 16)
seg['versionId'] = seg.get('versionId', "")
seg['modified'] = False
self.manifest[object_name][offset] = seg
metadata = self._get_object_metadata(object_name)
self.manifest[object_name].update(metadata)
try:
segment_dir = self._full_path(
self.manifest[object_name]['segments-dir']
)
os.makedirs(segment_dir, exist_ok=True)
except BaseException:
pass
# If the file was moved (or renamed) and the path changed
# as part of the rename, we need to make sure the new
# path exists before we write out the new manifest file.
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(json.dumps(manifest))
else:
os.makedirs(os.path.dirname(full_path), exist_ok=True)
# this is either write or create request
if (
(flags & os.O_WRONLY) == os.O_WRONLY
):
try:
self.object_unlink(object_name)
except BaseException:
pass
try:
with open(full_path, "w") as f:
pass
except BaseException:
pass
try:
segment_dir = self._full_segment_path(
object_name + "-segments"
)
os.makedirs(segment_dir, exist_ok=True)
except BaseException:
pass
# flush the new manifest to object store
fh = os.open(full_path, flags)
try:
self._create_segments_folder(object_name)
self.__worker_pool[object_name][fh] = \
self.__object_open_backend_pool
self.manifest[object_name] = {'open_handles': {}}
self.manifest[object_name][
'open_handles'][fh] = {'readonly': False}
self._object_close(object_name, fh)
finally:
self.__worker_pool[object_name].pop(fh, None)
self.manifest[object_name][
'open_handles'].pop(fh, None)
os.close(fh)
try:
with open(full_path, "w") as f:
pass
except BaseException:
pass
# A vexing problem that causes cache file removed but
# some outstanding file handles in the manifest[object_name].
# As a temporary work around, we will touch the file
# before we open the file
try:
from pathlib import Path
Path(full_path).touch()
except BaseException:
pass
fh = os.open(full_path, flags)
try:
self.manifest[object_name][
'open_handles'][fh] = {'readonly': readonly}
self.__worker_pool[object_name][fh] = self.BackendWorkerPool(
CONF.worker_pool_size, options
)
self.__pending_ios[object_name][fh] = set()
return fh
except Exception as ex:
LOG.exception(ex)
self.__worker_pool[object_name].pop(fh, None)
self.manifest[object_name]['open_handles'].pop(fh, None)
self.__pending_ios[object_name].pop(fh, None)
os.close(fh)
raise
finally:
with self.__open_close_lock:
self.__in_open[object_name] -= 1
def __purge_old_segments_task(self, backend, segment_list):
""" Utility method to be dispatched to a worker
thread in order to clean up old segments.
Args:
backend (class instance): Instance of the backend plugin.
segment_list (list): List of segments to remove.
"""
try:
_opts = options.copy()
_opts = bunchify(_opts)
backend.delete_object_list(segment_list, _opts)
except Exception as ex:
LOG.exception(ex)
def _object_close(self, object_name, fh):
""" Handle object_close by updating the manifest.
Waits for all of the background jobs to complete prior to updating
manifest using the main thread. Also queues up tasks to clean up
any old segments.
"""
self.__worker_pool[object_name][fh].wait_completion()
with self.__manifest_lock[object_name]:
try:
segments_dir = self.manifest[object_name].get(
'segments-dir',
"/" + _SEGMENT_PREFIX + object_name + "-segments",
)
container, prefix = self.split_head_tail(segments_dir)
object_manifest = []
segments = 0
if not self.manifest[object_name][
'open_handles'][fh]['readonly']:
offset = 0
total_size = 0
segment_size = 0
while True:
if offset not in self.manifest[object_name]:
break
try:
st = self.object_getattr(
self.manifest[object_name][offset]['name'],
fh,
)
except BaseException:
# The extent must be present. Try one more time
time.sleep(5)
st = self.object_getattr(
self.manifest[object_name][offset]['name'],
fh,
)
if self.manifest[object_name][offset]['modified']:
stat = bunchify(st)
segment_size = min(
stat.st_size, CONF.vault_segment_size
)
object_manifest.append(
{
"path": self.manifest[object_name][offset][
'name'
],
"versionId": stat.version_id,
"etag": stat.etag,
"size_bytes": segment_size,
"content_type": stat.content_type,
}
)
else:
segment_size = self.manifest[object_name][offset][
'size_bytes'
]
object_manifest.append(
{
"path": self.manifest[object_name][offset][
'name'
],
"versionId": self.manifest[object_name][
offset].get('versionId', ""),
"etag": self.manifest[object_name][offset][
'hash'
],
"size_bytes": segment_size,
"content_type": self.manifest[object_name][
offset
]['content_type'],
}
)
offset += CONF.vault_segment_size
segments += 1
total_size += segment_size
self._write_object_manifest(
object_name,
object_manifest,
metadata={
'segments-dir': segments_dir,
'segment-count': str(segments),
'total-size': str(total_size),
},
)
offset = 0
# Only cleanup extra segment versions if the manifest
# contains any segments.
while True and segments:
objects = self.segment_list(container, prefix, offset)
if len(objects) == 0:
break
objects = sorted(objects)
try:
p = self.manifest[object_name][offset]['name']
except:
p = ""
p = p.strip("/")
purge_list = list(set(objects) - set([p]))
if len(purge_list) > 0:
self.__worker_pool[object_name][fh].add_job(
self.__purge_old_segments_task,
purge_list,
)
offset += CONF.vault_segment_size
return
except Exception as ex:
LOG.exception(ex)
def object_close(self, object_name, fh):
self._object_close(object_name, fh)
with self.__manifest_lock[object_name]:
self._object_close(object_name, fh)
self.__worker_pool[object_name].pop(fh)
self.__pending_ios[object_name].pop(fh)
self.manifest[object_name]['open_handles'].pop(fh)
try:
os.close(fh)
except BaseException:
pass
if len(self.manifest[object_name]['open_handles']) == 0:
try:
oh = self.__worker_pool.pop(object_name, None)
assert oh is None
pios = self.__pending_ios.pop(object_name, None)
assert pios is None
os.remove(self._full_path(object_name))
except BaseException:
pass
self.manifest.pop(object_name)
with self.__open_close_lock:
if not self.__in_open[object_name]:
if object_name not in self.manifest:
self.__manifest_lock.pop(object_name, None)
def object_upload(self, object_name, offset, buf, fh):
with self.__manifest_lock[object_name]:
try:
if (
offset in self.manifest[object_name]
and self.manifest[object_name][offset]['modified'] is True
):
seg_fullname = self.manifest[object_name][offset]['name']
else:
segname = self.__next_segname_from_offset(object_name,
offset)
segments_dir = self.manifest[object_name].get(
'segments-dir',
"/" + _SEGMENT_PREFIX + object_name + "-segments",
)
seg_fullname = os.path.join(segments_dir, segname)
_opts = options.copy()
_opts['path_valid'] = "0" in self.manifest[object_name]
container, obj = self.split_head_tail(seg_fullname)
_opts['segment_size'] = len(buf)
_opts['object_name'] = obj.rstrip('/')
_opts = bunchify(_opts)
args1 = [container, buf, _opts['segment_size']]
self.__pending_ios[object_name][fh].add(offset)
self.__worker_pool[object_name][fh].\
add_job(self.__object_upload_task,
args1, _opts, object_name,
offset, seg_fullname, fh)
except Exception as ex:
LOG.exception(ex)
assert offset in self.__pending_ios[object_name][fh]
self.__pending_ios[object_name][fh].remove(offset)
raise
def __object_upload_task(self, backend, args1, _opts, object_name,
offset, seg_fullname, fh):
""" Method run by a worker in the thread pool to upload an object segment.
"""
LOG.info(
'Object [%s] segment [%s] upload running in thread[%x].'
% (object_name, seg_fullname, threading.current_thread().ident)
)
buf = args1[1]
buf_len = args1[2]
args1 = args1[:1] # object name parts
with self.__manifest_lock[object_name]:
assert offset in self.__pending_ios[object_name][fh]
try:
_opts.path_valid = True
resp = backend.put_object(args1, buf, buf_len, _opts)
except Exception as ex:
LOG.exception(ex)
with self.__manifest_lock[object_name]:
assert offset in self.__pending_ios[object_name][fh]
self.__pending_ios[object_name][fh].remove(offset)
raise
with self.__manifest_lock[object_name]:
if offset not in self.manifest[object_name]:
self.manifest[object_name][offset] = {}
self.manifest[object_name][offset]['name'] = seg_fullname
self.manifest[object_name][offset]['modified'] = True
self.manifest[object_name][offset]['versionId'] = \
resp.get('VersionId', "")
assert offset in self.__pending_ios[object_name][fh]
self.__pending_ios[object_name][fh].remove(offset)
@disable_logging
def object_download(self, object_name, offset, fh):
while True:
time.sleep(.1)
with self.__manifest_lock[object_name]:
if offset in self.__pending_ios[object_name][fh]:
continue
if offset not in self.manifest[object_name]:
raise Exception(
'object %s not found ' % (object_name
+ SEGMENT_FORMAT % (int(offset), int(0)))
)
seg_fullname = self.manifest[object_name][offset]['name']
break
container, obj = self.split_head_tail(seg_fullname.rstrip('/'))
_opts = {}
_opts['version_id'] = self.manifest[object_name][offset].\
get('versionId', "")
_opts = bunchify(_opts)
args1 = [container, obj]
try:
buf = self.__backend.get_object(args1, _opts)
return bytearray(buf)
except Exception as ex:
LOG.exception(ex)
raise
def object_delete(self, object_name):
self.__manifest_lock.setdefault(
object_name, threading.RLock()
)
with self.__manifest_lock[object_name]:
container, obj = self.split_head_tail(object_name)
_opts = options.copy()
_opts = bunchify(_opts)
args1 = [container]
if obj != '' and obj != '/':
args1.append(obj)
try:
self.__backend.delete_object(args1, _opts)
except Exception as ex:
LOG.exception(ex)
pass
def object_truncate(self, object_name, length, fh=None):
if length:
return
fh = self.object_open(object_name, os.O_WRONLY)
self.object_close(object_name, fh)
def segment_list(self, container, segments_dir, offset):
_opts = options.copy()
_opts['delimiter'] = None
_opts['human'] = False
_opts['totals'] = False
_opts['long'] = False
_opts['prefix'] = os.path.join(segments_dir, "%016x" % int(offset))
args = []
if container == '':
args = []
else:
args = [container]
_opts = bunchify(_opts)
# Need to resolve this with the Swift backend.
# For now, we need to add back in the prefix for the segments.
return self.__backend.list_segments(args, _opts)
def __next_segname_from_offset(self, path, offset):
container, prefix = self.split_head_tail(path)
segments_dir = self.manifest[path].get('segments-dir', None)
if segments_dir:
if segments_dir.startswith("/" + _SEGMENT_PREFIX):
container, prefix = self.split_head_tail(segments_dir)
c, p = self.split_head_tail(segments_dir)
segments_dir = p
else:
segments_dir = prefix + "-segments"
files = self.segment_list(container, segments_dir, offset)
if len(files) == 0:
return SEGMENT_FORMAT % (int(offset), int(0))
return SEGMENT_FORMAT % (
int(offset),
int(sorted(files)[-1].split(segments_dir)[1].split('.')[1], 16)
+ 1,
)
pass
def curr_segname_from_offset(self, path, offset):
with self.__manifest_lock[path]:
container, prefix = self.split_head_tail(path)
if self.manifest.get(path, None) and offset in self.manifest[path]:
seg = self.manifest[path][offset]['name']
return seg.split('-segments/')[1]
segments_dir = self.manifest[path].get('segments-dir', None)
if segments_dir:
c, p = self.split_head_tail(segments_dir)
segments_dir = p
else:
segments_dir = prefix + "-segments"
files = self.segment_list(container, segments_dir, offset)
if len(files) == 0:
return SEGMENT_FORMAT % (int(offset), int(0))
return SEGMENT_FORMAT % (
int(offset),
int(sorted(files)[-1].split(segments_dir)[1].split('.')[1], 16),
)
def _stat_cache(self, object_cache_path):
""" Utility to check the tmpfs cache and return stat() from that.
This method looks to see if the object (directory) exists in the
local cache and will perform a stat() locally instead of calling
all the way to the backend.
Args:
object_cache_path (str): Cache path to the object we
need stat infor on.
Returns:
Dictionary of stat() information or None if object does not exist.
"""
if os.path.isdir(object_cache_path):
try:
st = os.lstat(object_cache_path)
stat_info = dict(
(key, getattr(st, key))
for key in (
'st_atime',
'st_ctime',
'st_gid',
'st_mode',
'st_mtime',
'st_nlink',
'st_size',
'st_uid',
)
)
# st_blksize and st_blocks are import for qemu-img
# info command to display disk size attribute correctly.
# Without this information it displays disk size 0
stat_info['st_blksize'] = 512
stat_info['st_blocks'] = int(stat_info['st_size'] // 512)
return stat_info
except Exception as e:
LOG.exception(e)
pass
return None
def object_getattr(self, object_name, fh=None):
full_path = self._full_path(object_name)
container, prefix = self.split_head_tail(object_name)
_opts = options.copy()
if container == '':
args = []
else:
args = [container]
_opts['delimiter'] = None
_opts['human'] = False
_opts['totals'] = False
_opts['long'] = False
_opts['prefix'] = None
_opts = bunchify(_opts)
d = {}
if prefix != '':
_opts['prefix'] = prefix
args.append(prefix)
else:
prefix = None
try:
st = self.__backend.stat_object(args, _opts)
d['st_gid'] = self.group_id
d['st_uid'] = self.user_id
d['etag'] = st['headers'].get('ETag', "")
d['version_id'] = st['headers'].get('VersionId', "")
d['content_type'] = st['headers'].get('ContentType', "")
d['st_atime'] = int(st['timestamp'])
d['st_ctime'] = int(st['timestamp'])
d['st_mtime'] = int(st['timestamp'])
d['st_nlink'] = 1
d['st_mode'] = 33261
if prefix is not None and 'authorized_key' in prefix:
d['st_mode'] = 33152
d['st_size'] = int(st['size'])
if (
(d['st_size'] == 0 and container == '')
or (d['st_size'] == 0 and prefix is None)
or (d['st_size'] == 0 and prefix == '')
or st['directory'] is True
):
d['st_nlink'] = 3
d['st_size'] = 4096
d['st_mode'] = 16893
container_path = self._get_cache(object_name)
if not os.path.exists(container_path):
os.mkdir(container_path, 0o751)
except ClientError:
try:
# Clear the cache. if the object name is file
os.remove(full_path)
except Exception:
pass
# Clear the cache if the object name is dir
shutil.rmtree(full_path, ignore_errors=True)
os.lstat(full_path)
except Exception as ex:
st = os.lstat(full_path)
# if not os.path.isdir(full_path):
# raise OSError(2, 'No such file or directory', full_path)
d = dict(
(key, getattr(st, key))
for key in (
'st_atime',
'st_ctime',
'st_gid',
'st_mode',
'st_mtime',
'st_nlink',
'st_size',
'st_uid',
)
)
# st_blksize and st_blocks are import for qemu-img info command to
# display disk size attribute correctly. Without this information
# it displays disk size 0
d['st_blksize'] = 512
d['st_blocks'] = int(d['st_size'] // 512)
return d
def _set_object_retention(self, object_name, retainuntil):
try:
# make sure we can parse the datetime string correctly.
dt = datetime.strptime(retainuntil, '%Y-%m-%dT%H:%M:%S')
except Exception as ex:
LOG.exception(ex)
raise
offset = 0
while offset in self.manifest[object_name]:
try:
self.__backend.put_object_segment_retention(
self.manifest[object_name][offset]['name'], dt
)
except BaseException:
# The extent must be present. Try one more time
time.sleep(5)
self.__backend.put_object_segment_retention(
self.manifest[object_name][offset]['name'], dt
)
offset += self.manifest[object_name][offset]['size_bytes']
# set the retention on manifest
try:
self.__backend.put_manifest_retention(object_name, dt)
except BaseException:
# The extent must be present. Try one more time
time.sleep(5)
self.__backend.put_manifest_retention(object_name, dt)
def object_setxattr(self, object_name, name, value):
fh = self.object_open(object_name, os.O_RDONLY)
try:
with self.__manifest_lock[object_name]:
attrs = copy.deepcopy(self.manifest[object_name])
offset = 0
while offset in self.manifest[object_name]:
size = self.manifest[object_name][offset]['size_bytes']
attrs.pop(offset, None)
offset += size
attrs.pop('open_handles', None)
if name == 'retainuntil':
self._set_object_retention(object_name,
value.decode("utf8"))
else:
attrs[name] = value.decode("utf-8")
self.__backend.update_object_attributes(
object_name, attrs)
self.manifest[object_name][name] = value
except Exception as ex:
LOG.exception(ex)
raise
finally:
self.object_close(object_name, fh)
def object_removexattr(self, object_name, name):
fh = self.object_open(object_name, os.O_RDONLY)
try:
with self.__manifest_lock[object_name]:
attrs = copy.deepcopy(self.manifest[object_name])
offset = 0
while offset in self.manifest[object_name]:
size = self.manifest[object_name][offset]['size_bytes']
attrs.pop(offset, None)
offset += size
attrs.pop(name, None)
attrs.pop('open_handles', None)
self.__backend.update_object_attributes(
object_name, attrs)
self.manifest[object_name].pop(name, None)
except Exception as ex:
LOG.exception(ex)
raise
finally:
self.object_close(object_name, fh)
def object_listxattr(self, object_name):
fh = self.object_open(object_name, os.O_RDONLY)
try:
with self.__manifest_lock[object_name]:
# We use the "user" namespace to please XFS utils
attrs = copy.deepcopy(self.manifest[object_name])
attrs.pop('open_handles')
keys = attrs.keys()
return keys
except Exception as ex:
LOG.exception(ex)
raise
finally:
self.object_close(object_name, fh)
def object_getxattr(self, object_name, name):
fh = self.object_open(object_name, os.O_RDONLY)
try:
with self.__manifest_lock[object_name]:
return bytes(self.manifest[object_name][name], 'utf-8')
except Exception as ex:
LOG.exception(ex)
raise
finally:
self.object_close(object_name, fh)
def object_readdir(self, object_name, fh):
listing = []
dir_path = object_name.rstrip("/") + "/"
dirents = []
listing += self.__backend.list_objects(dir_path)
for lst in listing:
dirents.append(lst.split("/")[-1])
for r in list(dirents):
yield r
def object_access(self, object_name, mode):
pass
def object_unlink(self, object_name, leave_segments=False):
options['leave_segments'] = leave_segments
_opts = options.copy()
_opts = bunchify(_opts)
if not leave_segments:
try:
obj_manifest = self._read_object_manifest(object_name)
obj_metadata = self._get_object_metadata(object_name)
segment_list = []
for man in obj_manifest:
segment_list.append(man['name'])
self.__backend.delete_object_list(segment_list, _opts)
segments_dir = obj_metadata.get('segments-dir')
if segments_dir is not None:
args = [segments_dir]
self.__backend.rmdir_object(args, _opts)
try:
os.rmdir(self._get_cache(segments_dir))
except BaseException:
pass
try:
os.remove(self._get_cache(object_name))
except BaseException:
pass
except Exception as ex:
LOG.exception(ex)
pass
try:
self.__backend.delete_object_manifest(object_name)
except Exception as ex:
LOG.exception(ex)
pass
def object_statfs(self, object_name):
_opts = options.copy()
_opts = bunchify(_opts)
args1 = []
try:
stv = self.__backend.stat_object(args1, _opts)
except Exception as ex:
LOG.exception(ex)
raise FuseOSError(errno.ENOENT)
# if 'x-account-meta-quota-bytes' in stv['headers']['Metadata']:
# f_blocks = int(stv['metadata']['x-account-meta-quota-bytes'])
# f_bavail = int(stv['metadata']['x-account-meta-quota-bytes']
# ) - int(stv['metadata']['x-account-bytes-used'])
if 'x-account-meta-quota-bytes' in stv['headers']:
f_blocks = int(stv['x-account-meta-quota-bytes'])
f_bavail = int(stv['x-account-meta-quota-bytes']) - int(
stv['x-account-bytes-used']
)
else:
f_blocks = -1
# f_bavail = int(stv['metadata']['x-account-bytes-used'])
f_bavail = int(stv['size'])
dt = {}
dt['f_blocks'] = f_blocks
dt['f_bfree'] = f_bavail
dt['f_bavail'] = f_bavail
dt['f_favail'] = 0
dt['f_frsize'] = 0
return dt
def mkdir(self, path, mode, ist=False):
LOG.debug(("mkdir, %s" % path))
container, obj = self.split_head_tail(path)
# if (obj == '' or obj == '/') and ist is False:
_opts = options.copy()
_opts = bunchify(_opts)
args = [container, obj]
try:
self.__backend.mkdir_object(args, _opts)
except Exception as ex:
LOG.exception(ex)
return 0
# return 0 ISSUE 1
'''
REASON:
You had an additional return0 above,
So the cod coudn't react the bottom
FIX:
I got rid of return 0 line
'''
cache_path = self._get_cache(path)
if ist is False:
cache_path = self._get_cache(os.path.join(container, obj))
try:
os.makedirs(cache_path, mode)
except Exception as ex:
LOG.exception(ex)
return 0
def rmdir(self, object_name):
container, obj = self.split_head_tail(object_name)
_opts = options.copy()
_opts['prefix'] = None
_opts = bunchify(_opts)
path = object_name.rstrip("/") + '/'
dir_items = []
try:
dir_items += self.__backend.list_objects(path, max_items=2)
except Exception as ex:
LOG.exception(ex)
if len(dir_items) > 0:
raise FuseOSError(errno.ENOTEMPTY)
try:
self.__backend.rmdir_object([path.rstrip("/")], _opts)
except Exception as ex:
LOG.exception(ex)
pass
# Now remove the directory from the _SEGMENT_PREFIX tree
# only if it is empty
seg_path = os.path.join(_SEGMENT_PREFIX, path.lstrip("/"))
try:
dir_items += self.__backend.list_objects(seg_path, max_items=2)
except Exception as ex:
LOG.exception(ex)
if len(dir_items) == 0:
try:
self.__backend.rmdir_object([seg_path.rstrip("/")], _opts)
except Exception as ex:
LOG.exception(ex)
pass
try:
segment_cache_path = self._get_segment_cache(path)
if os.path.isdir(segment_cache_path):
shutil.rmtree(segment_cache_path)
except BaseException:
pass
try:
cache_path = self._get_cache(path)
# Check to make sure that the directory exists, otherwise
# skip the removal.
if os.path.isdir(cache_path):
shutil.rmtree(cache_path)
except BaseException:
pass
return
def chmod(self, path, mode):
LOG.debug(("chmod, %s" % path))
try:
container, prefix = self.split_head_tail(path)
cache_path = self._get_cache(os.path.join(container, prefix))
return os.chmod(cache_path, mode)
except BaseException:
pass
def chown(self, path, uid, gid):
LOG.debug(("chown, %s" % path))
try:
container, prefix = self.split_head_tail(path)
cache_path = self._get_cache(os.path.join(container, prefix))
return os.chown(cache_path, uid, gid)
except BaseException:
pass
def symlink(self, name, target):
raise Exception("Not Applicable")
def rename(self, old, new):
assert self.manifest.get(new, None) is None
if self.manifest.get(old, None):
time.sleep(30)
if self.manifest.get(old, None):
raise Exception("%s in use. Try again" % old)
LOG.debug(("rename, %s -> %s" % (old, new)))
# If the new file exists, then we leak "new" file existing
# segments if rename is called. We need to carefully cleanup
# new file segments
# first make a copy of "new" manifest so the backup file is still
# visible in the fuse mount if we were to delete the file later
# Without having bak file, we may loose any references to new
# segments if the fuse crash while rename operation is performed.
#
newbak_name = new + '~'
try:
# remove any reference to backup file. May happen
# if last rename operation failed in the middle or
# fuse mount crashed
self.object_unlink(newbak_name, leave_segments=True)
except Exception as ex:
LOG.exception(ex)
try:
new_metadata = self._get_object_metadata(new)
old_metadata = self._get_object_metadata(old)
if new_metadata.get('segments-dir', new + '-segments') == \
old_metadata.get('segments-dir', old + '-segments'):
self.object_unlink(old, leave_segments=True)
return 0
except Exception:
pass
newfile_exists = False
try:
try:
new_metadata = self._get_object_metadata(new)
except Exception:
pass
else:
new_manifest = self._read_object_manifest(new)
segments_dir = new_metadata.get('segments-dir',
new + '-segments')
newbak_manifest = []
for man in new_manifest:
newbak_manifest.append(
{"path": man['name'], "etag": man['hash'],
"size_bytes": man['size_bytes'],
"versionId": man.get('versionId', ""),
"content_type": man['content_type']})
self._write_object_manifest(
newbak_name, newbak_manifest, metadata={
'segments-dir': segments_dir,
'segment-count': new_metadata['segment-count'],
'total-size': new_metadata['total-size']})
newfile_exists = True
except BaseException as ex:
LOG.exception(ex)
raise
# new and newbak refers the same segment list now
# make a copy of the old manifest
try:
old_manifest = self._read_object_manifest(old)
old_metadata = self._get_object_metadata(old)
segments_dir = old_metadata.get('segments-dir', old + '-segments')
# old_manifest = json.loads(old_manifest)
new_manifest = []
for man in old_manifest:
# new_manifest.append({"name": man['name'],
# "hash": man['hash'],
# "content_type": man['content_type'],
# "bytes": man['bytes']})
new_manifest.append(
{
"path": man['name'],
"etag": man['hash'],
"size_bytes": man['size_bytes'],
"versionId": man.get('versionId', ""),
"content_type": man['content_type'],
}
)
# new_manifest = json.dumps(new_manifest)
self._write_object_manifest(
new,
new_manifest,
metadata={
'segments-dir': segments_dir,
'segment-count': old_metadata['segment-count'],
'total-size': old_metadata['total-size']})
try:
self.object_unlink(old, leave_segments=True)
except Exception as ex:
LOG.exception(ex)
except BaseException:
raise
finally:
if newfile_exists:
try:
self.object_unlink(newbak_name, leave_segments=False)
except Exception as ex:
LOG.exception(ex)
return 0
def link(self, target, name):
LOG.debug(("link, %s" % target))
container, prefix = split_head_tail(target)
cache_path_target = self._get_cache(os.path.join(container, prefix))
container, prefix = split_head_tail(name)
cache_path_name = self._get_cache(os.path.join(container, prefix))
return os.link(cache_path_target, cache_path_name)
def utimens(self, path, times=None):
LOG.debug(("utimens, %s" % path))
container, prefix = split_head_tail(path)
cache_path = self._get_cache(path)
""" This file won't be existing until not downloaded
for writing/reading
"""
try:
os.utime(cache_path, times)
except BaseException:
return 0
return 0
def destroy(self, path):
# """
# if self.__backend.list_objects:
# self.__backend.list_objects.__exit__(None, None, None)
# if self.__backend.stat_object:
# self.__backend.stat_object.__exit__(None, None, None)
# if self.__backend.upload_object:
# self.__backend.upload_object.__exit__(None, None, None)
# if self.__backend.download_object:
# self.__backend.download_object.__exit__(None, None, None)
# if self.__backend.delete_object:
# self.__backend.delete_object.__exit__(None, None, None)
# if self.__backend.mkdir_object:
# self.__backend.mkdir_object.__exit__(None, None, None)
# """
# Not sure we need this - cjk
# if self.__backend.st_cap:
# self.__backend.st_cap.__exit__(None, None, None)
# shutil.rmtree(self.root) ##COMBACK
return 0
class FuseCache(object):
def __init__(self, root, repository):
self.root = root
self.repository = repository
self.lrucache = {}
self.lrucache_lock = threading.Lock()
self.user_id = getpwnam(FUSE_USER).pw_uid
self.group_id = getpwnam(FUSE_USER).pw_gid
if not os.path.isdir(self.root):
try:
fs.mkdir(self.root)
fs.chown(self.root,
[str(self.user_id) + ':' + str(self.group_id)])
except BaseException:
raise
else:
stat_info = os.stat(self.root)
if stat_info.st_uid != self.user_id or \
stat_info.st_gid != self.group_id:
fs.chown(CONF.vault_data_directory_old,
[str(self.user_id) + ':' + str(self.group_id)])
def object_open(self, object_name, flags):
fh = self.repository.object_open(object_name, flags)
with self.lrucache_lock:
self.lrucache.setdefault(object_name, {})
item = self.lrucache[object_name].pop(fh, None)
assert item is None
self.lrucache[object_name].setdefault(fh, {
'lrucache': LRUCache(maxsize=CONF.vault_cache_size),
'lrulock': threading.Lock(),
'object_name': object_name,
'writes_before_update_manifest': NUMBER_UPLOADS_BEFORE_FLUSH,
})
return fh
def object_flush(self, object_name, fh, update_manifest=True):
LOG.debug(
'Cache object_flush [%s] [%x]'
% (object_name, threading.current_thread().ident)
)
item = self.lrucache[object_name][fh]
assert item['object_name'] == object_name
cache = item['lrucache']
with item['lrulock']:
try:
while True:
off, item = cache.popitem()
if item['modified']:
self.repository.object_upload(
object_name, off, item['data'], fh
)
except KeyError:
pass
if update_manifest:
self.repository._object_close(object_name, fh)
def object_close(self, object_name, fh):
LOG.debug(
'Cache object_close [%s] [%x]'
% (object_name, threading.current_thread().ident)
)
self.object_flush(object_name, fh, update_manifest=False)
with self.lrucache_lock:
self.lrucache[object_name].pop(fh, None)
if not len(self.lrucache[object_name]):
self.lrucache.pop(object_name, None)
self.repository.object_close(object_name, fh)
return
def object_truncate(self, object_name, length, fh=None):
self.repository.object_truncate(object_name, length, fh)
def _walk_segments(self, offset, length):
while length != 0:
seg_offset = (
offset // CONF.vault_segment_size * CONF.vault_segment_size
)
base = offset - seg_offset
seg_len = min(length, CONF.vault_segment_size - base)
yield seg_offset, base, seg_len
offset += seg_len
length -= seg_len
def object_read(self, object_name, length, offset, fh):
# if len(cache) == CONF.vault_cache_size:
# off, item = cache.popitem()
# segname = next_segname_from_offset(off)
# object_name = os.path.join(SEGMENT_DIR, segname)
# if modified upload to object store. We assume
# it is not modified for now
# object_upload(object_name, item)
assert self.lrucache[object_name][fh]['object_name'] == object_name
output_buf = bytearray()
with self.lrucache[object_name][fh]['lrulock']:
for segoffset, base, seg_len in self._walk_segments(
offset, length
):
try:
segdata = self.lrucache[object_name][
fh]['lrucache'][segoffset]['data']
except BaseException:
try:
# cache miss. free up a cache slot
cache = self.lrucache[object_name][fh]['lrucache']
if len(cache) == CONF.vault_cache_size:
# cache overflow
# kick an item so we can accomodate new one
off, item = cache.popitem()
if item['modified']:
self.repository.object_upload(
object_name, off, item['data'], fh
)
self.lrucache[object_name][fh][
'writes_before_update_manifest'
] -= 1
if not self.lrucache[object_name][fh][
'writes_before_update_manifest'
]:
self.repository._object_close(
object_name, fh
)
self.lrucache[object_name][fh][
'writes_before_update_manifest'
] = NUMBER_UPLOADS_BEFORE_FLUSH
segdata = self.repository.object_download(
object_name, segoffset, fh
)
self.lrucache[object_name][
fh]['lrucache'][segoffset] = {
'modified': False,
'data': segdata,
}
except Exception as ex:
# end of file
LOG.exception(ex)
return 0
output_buf += segdata[base:base + seg_len]
return bytes(output_buf)
def object_write(self, object_name, buf, offset, fh):
LOG.debug(
'Cache object_write [%s] [%x]'
% (object_name, threading.current_thread().ident)
)
length = len(buf)
assert self.lrucache[object_name][fh]['object_name'] == object_name
bufptr = 0
with self.lrucache[object_name][fh]['lrulock']:
for segoffset, base, seg_len in self._walk_segments(
offset, length
):
cache = self.lrucache[object_name][fh]['lrucache']
if segoffset not in cache:
if len(cache) == CONF.vault_cache_size:
# cache overflow
# kick an item so we can accomodate new one
off, item = cache.popitem()
if item['modified']:
self.repository.object_upload(
object_name, off, item['data'], fh
)
self.lrucache[object_name][fh][
'writes_before_update_manifest'
] -= 1
if not self.lrucache[object_name][fh][
'writes_before_update_manifest'
]:
self.repository._object_close(object_name, fh)
self.lrucache[object_name][fh][
'writes_before_update_manifest'
] = NUMBER_UPLOADS_BEFORE_FLUSH
# we need to handle offset that is not object
# segment boundary read the segment before modifying it
try:
# populate cache
segdata = self.repository.object_download(
object_name, segoffset, fh
)
if segdata is None:
raise Exception("Object not found")
cache[segoffset] = {'modified': False, 'data': segdata}
except BaseException:
# we have not written the segment to the repository yet
segdata = bytearray(1)
cache[segoffset] = {'modified': True, 'data': segdata}
else:
segdata = cache[segoffset]['data']
if len(segdata) < base:
segdata.extend(b'\0' * (base + seg_len - len(segdata)))
segdata[base:base + seg_len] = buf[bufptr:bufptr + seg_len]
cache[segoffset]['modified'] = True
bufptr += seg_len
return len(buf)
# ISSUE 2
# IT SHOULD BE self.repository._full_path(path)
# def mkdir(self, path, mode):
# return os.mkdir(self._full_path(path), mode)
# def rmdir(self, path):
# full_path = self._full_path(path)
# return os.rmdir(full_path)
# def chmod(self, path, mode):
# full_path = self._full_path(path)
# return os.chmod(full_path, mode)
# def chown(self, path, uid, gid):
# full_path = self._full_path(path)
# return os.chown(full_path, uid, gid)
'''
REASON:
You were doing self._full_path, which is doing nothing.
You need to do self.respository that contains backend info
FIX:
I changed self._full_path to self.respository._full_path
'''
def mkdir(self, path, mode):
return os.mkdir(self.repository._full_path(path), mode)
def rmdir(self, path):
full_path = self.repository._full_path(path)
return os.rmdir(full_path)
def chmod(self, path, mode):
full_path = self.repository._full_path(path)
return os.chmod(full_path, mode)
def chown(self, path, uid, gid):
full_path = self.repository._full_path(path)
return os.chown(full_path, uid, gid)
class TrilioVault(Operations):
def __init__(self, root, repository=None):
self.root = root
self.repository = repository or BackendRepository(root)
self.cache = FuseCache(root, self.repository)
# Filesystem methods
# ==================
def access(self, path, mode):
self.repository.object_access(path, mode)
def chmod(self, path, mode):
return self.repository.chmod(path, mode)
def chown(self, path, uid, gid):
return self.repository.chown(path, uid, gid)
@disable_logging
def getattr(self, path, fh=None):
return self.repository.object_getattr(path, fh)
def setxattr(self, path, name, value, size):
return self.repository.object_setxattr(path, name, value)
def getxattr(self, path, name):
if name.startswith("security") or name.startswith("system"):
return bytes("", 'utf-8')
return self.repository.object_getxattr(path, name)
def listxattr(self, path):
return self.repository.object_listxattr(path)
def removexattr(self, path, name):
return self.repository.object_removexattr(path, name)
def readdir(self, path, fh):
dirents = ['.', '..']
dirents.extend(self.repository.object_readdir(path, fh))
for r in dirents:
yield r
def readlink(self, path):
pathname = os.readlink(self._full_path(path))
if pathname.startswith("/"):
# Path name is absolute, sanitize it.
return os.path.relpath(pathname, self.root)
else:
return pathname
# ISSUE 3
# def mknod(self, path, mode, dev):
# return os.mknod(self._full_path(path), mode, dev)
'''
REASON:
You were doing self._full_path, which is doing nothing.
You need to do self.respository that contains backend info
FIX:
I changed self._full_path to self.respository._full_path
'''
def mknod(self, path, mode, dev):
return os.mknod(self.repository._full_path(path), mode, dev)
def rmdir(self, path):
return self.repository.rmdir(path)
def mkdir(self, path, mode):
return self.repository.mkdir(path, mode)
def statfs(self, path):
return self.repository.object_statfs(path)
def unlink(self, path):
return self.repository.object_unlink(path)
def symlink(self, name, target):
return self.repository.symlink(name, target)
def rename(self, old, new):
return self.repository.rename(old, new)
def link(self, target, name):
return self.repository.link(target, name)
def utimens(self, path, times=None):
return self.repository.utimens(path, times)
# File methods
# ============
def open(self, path, mode):
return self.cache.object_open(path, mode)
def create(self, path, mode, fi=None):
return self.open(path, os.O_CREAT)
def read(self, path, length, offset, fh):
buf = self.cache.object_read(path, length, offset, fh)
return buf
def write(self, path, buf, offset, fh):
return self.cache.object_write(path, buf, offset, fh)
def truncate(self, path, length, fh=None):
self.cache.object_truncate(path, length, fh=fh)
def flush(self, path, fh):
self.cache.object_flush(path, fh)
def release(self, path, fh):
self.cache.object_close(path, fh)
return
# def fsync(self, path, fdatasync, fh):
# return self.cache.object_flush(path, fh)
def destroy(self, path):
LOG.debug("destroy, %s" % path)
return self.repository.destroy(path)
def fuse_conf(conf_file="/etc/fuse.conf"):
found = 0
if os.path.exists(conf_file):
with open(conf_file, 'r') as f:
for line in f:
if 'user_allow_other' in line and '#' not in line:
found = 1
break
if found == 0:
s = 'user_allow_other \n'
if os.path.exists(conf_file):
with open(conf_file, 'rt') as f:
s = f.read() + '\n' + 'user_allow_other \n'
with open('/tmp/fuse.conf.tmp', 'wt') as outf:
outf.write(s)
fs.copy('/tmp/fuse.conf.tmp', conf_file)
fs.remove('/tmp/fuse.conf.tmp')
fs.chown('/etc/fuse.conf', ['root:root'])
def main(mountpoint, cacheroot):
try:
try:
fs.umount(mountpoint, ['-l'])
except BaseException:
pass
if os.path.isdir(mountpoint):
# Check if mount point is already having the required ownership
mountpoint_stat = os.stat(mountpoint)
user_id = getpwnam(FUSE_USER).pw_uid
group_id = getpwnam(FUSE_GROUP).pw_gid
if not (mountpoint_stat.st_uid == user_id and
mountpoint_stat.st_gid == group_id):
fs.chown(mountpoint, ['-R', FUSE_USER + ':' + FUSE_GROUP])
else:
fs.mkdir(mountpoint, ['-p'])
fs.mkdir(cacheroot, ['-p'])
# give a permission to mountpoint root
fs.chown(mountpoint, [FUSE_USER + ':' + FUSE_GROUP])
# give a permission to cacheroot
fs.chown(cacheroot, [FUSE_USER + ':' + FUSE_GROUP])
except Exception as ex:
LOG.exception(ex)
# clear /var/triliovault directory
[
shutil.rmtree(
os.path.join(CONF.vault_data_directory_old, x), ignore_errors=True
)
for x in os.listdir(CONF.vault_data_directory_old)
]
tvaultplugin = TrilioVault(
cacheroot, repository=BackendRepository(cacheroot)
)
disable_fuse_threads = True
if CONF.vault_threaded_filesystem.lower() == 'true':
disable_fuse_threads = False
FUSE(
tvaultplugin,
mountpoint,
nothreads=disable_fuse_threads,
foreground=True,
nonempty=True,
big_writes=True,
allow_other=True,
)
# Driver function
def run():
"""Main routine that drives fuse mount."""
option_config()
assert CONF.queue_depth > CONF.worker_pool_size
fuse_conf()
s3backend = vaults3.S3Backend(options)
s3backend.validate_s3_client(options)
s3backend.set_retention_mode(options)
main(CONF.vault_data_directory, CONF.vault_data_directory_old)
if __name__ == '__main__':
run()