Repository URL to install this package:
Version:
5.2.7 ▾
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2017 Trilio Data, Inc.
# All Rights Reserved.
""" Trilio S3 Backend implimentation
This module contains the back end implimentation of all of all S3 specific
support.
"""
# Disable pylint unused-argument checking because this is an interface
# implimentation file and not all method arguments would be used.
# pylint: disable=unused-argument
import datetime
import time
import json
import os
from oslo_log import log as logging
# AWS S3 API... why we're doing all this.
import boto3
import botocore
from botocore.exceptions import ClientError
from s3fuse.utils import get_retention_mode
BASENAME = 's3'
MAX_S3_PAGE_SIZE = 1000
_SEGMENT_PREFIX = "80bc80ff-0c51-4534-86a2-ec5e719643c2"
DEFAULT_MAX_POOL_SIZE = 500
LOG = logging.getLogger(__name__)
def _make_timestamp(modified_time):
""" Utility function used to convert a datetime to an OS timestamp.
Args:
modified_time (datetime): Datatime object to convert to a Unix Epoc timestamp.
Returns:
The value of modified_time as a timestamp.
"""
naive_time = modified_time.replace(tzinfo=None)
delta_seconds = (
naive_time - datetime.datetime(1970, 1, 1)).total_seconds()
return delta_seconds
class S3Backend(object):
""" S3 Backend implimentation.
A Wrapper for the AWS S3 boto3 and botocore API. This class encapsulates all S3
operations and exposes them as a backend storage instance.
"""
def __init__(self, options):
config_object = None
s3_client = None
self.options = options
MAX_POOL_SIZE = int(options.get("vault_s3_max_pool_connections")) if options.get(
"vault_s3_max_pool_connections") else DEFAULT_MAX_POOL_SIZE
if options['s3_signature'] != 'default':
config_object = botocore.client.Config(signature_version=options['s3_signature'],
read_timeout=int(options['s3_read_timeout']),
max_pool_connections=MAX_POOL_SIZE)
else:
config_object = botocore.client.Config(read_timeout=int(options['s3_read_timeout']),
max_pool_connections=MAX_POOL_SIZE)
if options['s3_ssl_cert'] != '':
s3_client = boto3.client('s3',
region_name=options['os_options']['region_name'],
aws_access_key_id=options['user'],
aws_secret_access_key=options['key'],
endpoint_url=options['os_options']['object_storage_url'],
config=config_object, verify=options['s3_ssl_cert'])
else:
s3_client = boto3.client('s3',
region_name=options['os_options']['region_name'],
aws_access_key_id=options['user'],
aws_secret_access_key=options['key'],
endpoint_url=options['os_options']['object_storage_url'],
config=config_object)
def set_connection_header(request, operation_name, **kwargs):
request.headers['Connection'] = 'keep-alive'
s3_client.meta.events.register('request-created.s3', set_connection_header)
self.__client = s3_client
self.__bucket_name = options['bucket']
self.__retention_mode = options['retention_mode']
def set_retention_mode(self, options):
# Get the retention mode if not set already.
options['retention_mode'] = get_retention_mode(self.__client, options['bucket'], options['bucket_object_lock'])
self.__retention_mode = options['retention_mode']
def validate_s3_client(self, options):
""" Validates the S3 credentials / permission on the provided bucket.
It also validates against s3 object lock configuration and if the s3 supports
more than 255 object name length
Args:
self : S3 Backend object
Returns:
Does not return anything, But raises exception if validation fails.
"""
try:
self.__client.head_bucket(Bucket=self.__bucket_name)
try:
obj_lock_cfg = self.__client.get_object_lock_configuration(Bucket=self.__bucket_name)
if options['bucket_object_lock'] and \
obj_lock_cfg["ObjectLockConfiguration"]["ObjectLockEnabled"] != "Enabled":
raise Exception("Configration says object lock enabled on the bucket, "
"but s3 API does not agree with it. %s" % obj_lock_cfg)
if not options['bucket_object_lock'] and \
obj_lock_cfg["ObjectLockConfiguration"]["ObjectLockEnabled"] == "Enabled":
raise Exception("Configration says object lock enabled on the bucket, "
"but s3 API does not agree with it. %s" % obj_lock_cfg)
if options['bucket_object_lock']:
if not options['use_manifest_suffix']:
raise Exception("'use_manifest_suffix' must be set to True when "
"'bucket_object_lock' is set to True")
except Exception as ex:
if options['bucket_object_lock']:
raise
# Add a check to see if the current object store will support
# our path length.
if not options['bucket_object_lock']:
long_key = os.path.join(
'tvault_config/',
'workload_f5190be6-7f80-4856-8c24-149cb40500c5/',
'snapshot_f2e5c6a7-3c21-4b7f-969c-915bb408c64f/',
'vm_id_e81d1ac8-b49a-4ccf-9d92-5f1ef358f1be/',
'vm_res_id_72477d99-c475-4a5d-90ae-2560f5f3b319_vda/',
'deac2b8a-dca9-4415-adc1-f3c6598204ed-segments/',
'0000000000000000.00000000')
self.__client.put_object(
Bucket=self.__bucket_name, Key=long_key, Body='Test Data')
self.__client.delete_object(Bucket=self.__bucket_name, Key=long_key)
except:
raise
def __delete_object_list(self, object_list):
""" Utility method that takes a list of objects and puts it in the correct format
for the S3 delete_objects() API.
Args:
object_list (list): A list of objects with the correct S3 path.
"""
try:
# Call delete_objects once for every 1,000 objects since it has a limit
# of 1,000 objects at a time.
while len(object_list) > 0:
object_delete_list = []
list_part = object_list[:MAX_S3_PAGE_SIZE]
for obj in list_part:
# S3 object names cannot start with "/"
if obj.startswith("/"):
obj = obj[1:]
object_delete_list.append({'Key': obj})
try:
self.__client.delete_objects(Bucket=self.__bucket_name,
Delete={'Objects': object_delete_list})
except:
for obj in object_delete_list:
self.__client.delete_object(Bucket=self.__bucket_name,
Key=obj['Key'])
object_list = object_list[MAX_S3_PAGE_SIZE:]
except ClientError:
raise
def __delete_object_tree(self, object_tree):
""" Utility method used to perform a rmdir operation on an object hierarchy
Args:
object_tree (str): Object/Tree path.
"""
try:
object_list = []
# The list_objects API is limited to 1,000 objects so we need to
# use a paginator.
list_paginator = self.__client.get_paginator('list_objects')
page_iterator = list_paginator.paginate(
Bucket=self.__bucket_name, Prefix=object_tree)
for objects in page_iterator:
if 'Contents' in objects and objects.get('KeyCount', len(objects['Contents'])) > 0:
for obj in objects['Contents']:
object_list.append(obj['Key'])
self.__delete_object_list(object_list)
except ClientError:
raise
def delete_object_list(self, object_list, options):
""" Delete a list of objects.
Args:
object_list (list): List of objects. Each with a full object path
in correct format.
options (dic): Dictionary of configuration options.
"""
self.__delete_object_list(object_list)
def delete_object(self, args, options):
""" Delete an object from the S3 object store.
The object and any segments stored in the segment directory will be
removed.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
"""
try:
if not args:
return
object_name = '/'.join(args)
# S3 object names cannot start with "/"
if object_name.startswith("/"):
object_name = object_name[1:]
if '-segments' in object_name:
# Just delete a single segment.
self.__client.delete_object(Bucket=self.__bucket_name,
Key=object_name)
else:
manifest_prefix = object_name.strip('/') + ".manifest"
manifests = self.list_objects(manifest_prefix, strict_path_filter=True)
if manifests:
manifest_name = sorted(manifests, reverse=True)[0]
else:
manifest_name = manifest_prefix
self.__client.delete_object(Bucket=self.__bucket_name,
Key=manifest_name)
except ClientError:
raise
def __wait_for_object(self, object_name, retries=1):
""" Utility method used to wait for a S3 object to become available.
This routine will keep performing a head_object() request every second
for "retries" number of times. This was added to work around any potential
eventual consistancey issues when uploading to AWS.
Args:
object_name (str): Name of the object.
retries (int): Optional parameter that defaults to 1.
Returns:
Returns when the object is available or ClientError not found exception.
"""
# Make sure retries is at least 1
retries = max(1, retries)
try:
for retry in range(0, retries):
try:
self.__client.head_object(Bucket=self.__bucket_name,
Key=object_name)
return
except ClientError as error:
if (error.response['ResponseMetadata']['HTTPStatusCode'] == 404 and
retry + 1 != retries):
time.sleep(1)
continue
raise
except ClientError:
raise
def get_object_manifest(self, object_name):
""" Download and return the object manifest as a json array.
Args:
object_name (str): Name of the object.
Returns:
Object manifest as a dictionary.
"""
try:
manifest_prefix = object_name.strip('/') + ".manifest"
if self.options['use_manifest_suffix']:
if self.options['bucket_object_lock']:
manifests = self.__client.list_object_versions(
Bucket=self.__bucket_name,
Prefix=manifest_prefix)
else:
manifests = self.__client.list_objects(
Bucket=self.__bucket_name,
Prefix=manifest_prefix)
if manifests:
manifest_name = manifest_prefix
manifests = manifests.get('Versions', []) or manifests.get('Contents', [])
if self.options['bucket_object_lock']:
for manifest in manifests:
resp = self.__client.get_object(
Bucket=self.__bucket_name, Key=manifest['Key'],
VersionId=manifest.get('VersionId', ""))
if resp['ResponseMetadata']['HTTPHeaders'].\
get('x-amz-meta-stamp-trilio-authenticity', "False") == "True":
return json.loads(resp['Body'].read(resp['ContentLength']))
manifest_name = manifests[-1]['Key']
else:
manifest_name = manifest_prefix
if not manifest_name:
raise Exception("Manifest object with "
"'x-amz-meta-stamp-trilio-authenticity' "
"attribute not found")
resp = self.__client.get_object(
Bucket=self.__bucket_name, Key=manifest_name)
return json.loads(resp['Body'].read(resp['ContentLength']))
except ClientError as error:
if error.response['ResponseMetadata']['HTTPStatusCode'] != 404:
raise
else:
try:
self.__wait_for_object(manifest_name, 10)
resp = self.__client.get_object(Bucket=self.__bucket_name,
Key=manifest_name)
return json.loads(resp['Body'].read(resp['ContentLength']))
except ClientError:
raise
def delete_object_manifest(self, object_name):
""" Download and return the object manifest as a json array.
Args:
object_name (str): Name of the object.
Returns:
Object manifest as a dictionary.
"""
manifest_prefix = object_name.strip('/') + ".manifest"
if self.options['bucket_object_lock']:
resp = self.__client.list_object_versions(
Bucket=self.__bucket_name,
Prefix=manifest_prefix)
else:
resp = self.__client.list_objects(
Bucket=self.__bucket_name,
Prefix=manifest_prefix)
if resp:
manifests = resp.get('Versions', []) or resp.get('Contents', [])
objs_to_delete = []
for man in manifests:
objs_to_delete.append({'Key': man['Key']})
if objs_to_delete:
try:
self.__client.delete_objects(Bucket=self.__bucket_name,
Delete={'Objects': objs_to_delete})
except:
for obj in objs_to_delete:
self.__client.delete_object(Bucket=self.__bucket_name,
Key=obj['Key'])
else:
raise Exception("Manifest for %s is not found" % object_name)
def download_object(self, args, options):
""" Download a file from the S3 object store.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
Returns:
On success, the contents of the object are downloaded to file identidfied
by options.out_file.
"""
try:
object_name = '/'.join(args)
extra_args = {}
if options.version_id:
extra_args = { 'VersionId': options.version_id }
self.__client.download_file(
self.__bucket_name, object_name, options.out_file, extra_args)
except ClientError:
raise
except Exception:
raise
def get_object(self, args, options):
""" Get object from the S3 object store in buffer.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
Returns:
On success, the contents of the object are downloaded to file identidfied
by options.out_file.
"""
try:
object_name = '/'.join(args)
if options.version_id:
response = self.__client.get_object(
Bucket=self.__bucket_name,
Key=object_name,
VersionId=options.version_id)
else:
response = self.__client.get_object(
Bucket=self.__bucket_name,
Key=object_name)
return response["Body"].read()
except ClientError:
raise
except Exception:
raise
def put_object(self, args, buf, buf_len, options):
""" Put an object to the S3 object store.
Args:
args (list): List of object name parts.
buf: Data to upload in object.
buf_len: Length of data to upload in object.
options (dic): Dictionary of configuration options.
"""
try:
# First check the path_valid flag to see if this is not the first segment.
# Note - Added this for performance reasons when I was tuning. Might be overkill if it becomes problematic.
if options.path_valid is False:
self.__create_folder(os.path.dirname(args[0] + '/' + options['object_name']),
options)
if options.object_name is not None:
resp = self.__client.put_object(Bucket=self.__bucket_name,
Key=args[0] + '/' + options['object_name'],
ContentLength=buf_len,
Body=buf,
ContentType='application/octet-stream')
# response object contains etag, version id and any necessary fields
# so we avoid calling head object.
return resp
except ClientError:
raise
def list_objects(self, prefix, max_items=None, strict_path_filter=False):
""" Return a list of objects based on the provided prefix.
Used to generate a directory listing based on a prefix path
constructed from the object parts in prefix
Args:
prefix (string): prefix of the object path. If the prefix is terminated
by "/", then it lists the contents in the "subdir". If the prefix
is not terminated by "/", then it lists the contents in the current
directory that matches the prefix
Ex:
case 1:
=======
prefix - /a/b/c
object store has objects - /a/b/c, /a/b/c1, /a/b/d
return values - /a/b/c and /a/b/c1
case 2:
=======
prefix - /a/b/c/
object store has objects - /a/b/c/d, /a/b/c1, /a/b/c/e
return values - /a/b/c/d and /a/b/c/e
options (dic): Dictionary of configuration options
max_items (int): Maximum number of objects in a list query. Default is
unlimited but when testing for an empty directory a smaller
number can be used to limit the query and size of the list that
is returned.
strict_path_filter (bool): Defaults to True in order to only return
objects with the exact same prefix. Individual segment listings
set this to False in order to get a match on the segment offset.
Returns:
On success a list of unique items to be used as a directory listing.
"""
object_set = set()
prefix = prefix.lstrip("/")
try:
# The list_objects API is limited to 1,000 objects so we need to
# use a paginator. Note: Page size is just a "hint" to the API.
# The API may choose to return more or fewer items per page.
page_size = MAX_S3_PAGE_SIZE
if max_items is not None and max_items < MAX_S3_PAGE_SIZE:
page_size = max_items
if self.options['bucket_object_lock']:
list_paginator = self.__client.get_paginator('list_object_versions')
else:
list_paginator = self.__client.get_paginator('list_objects')
page_iterator = list_paginator.paginate(Bucket=self.__bucket_name, # Prefix='',
Prefix=prefix,
Delimiter='/',
PaginationConfig={'PageSize': page_size})
for objects in page_iterator:
if 'Contents' in objects and objects.get('KeyCount', len(objects['Contents'])) > 0:
for item in objects['Contents']:
# Short circuit processing if we need fewer items.
if max_items is not None and len(object_set) >= max_items:
return list(object_set)
object_name = item['Key'].rstrip('/')
if strict_path_filter:
object_set.add(object_name)
elif ".manifest" in object_name:
# Trim ".manifest" and everything after it if that is in the name.
object_set.add(object_name.split(".manifest", 1)[0])
if 'Versions' in objects and objects.get('KeyCount', len(objects['Versions'])) > 0:
for item in objects['Versions']:
# Short circuit processing if we need fewer items.
if max_items is not None and len(object_set) >= max_items:
return list(object_set)
object_name = item['Key'].rstrip('/')
if strict_path_filter:
object_set.add(object_name)
elif ".manifest" in object_name:
# Trim ".manifest" and everything after it if that is in the name.
object_set.add(object_name.split(".manifest", 1)[0])
if 'CommonPrefixes' in objects:
for object_prefix in objects['CommonPrefixes']:
# Short circuit processing if we need fewer items.
if max_items is not None and len(object_set) >= max_items:
return list(object_set)
# Filter out the segment directory tree.
if prefix.startswith(_SEGMENT_PREFIX) or \
not object_prefix['Prefix'].startswith(_SEGMENT_PREFIX):
if prefix.endswith("/"):
object_set.add(object_prefix['Prefix'].strip('/'))
#object_set.add(object_prefix['Prefix'].split(prefix.rstrip("/"))[1].strip('/'))
else:
object_set.add(object_prefix['Prefix'].strip('/'))
return list(object_set)
except ClientError:
raise
def list_segments(self, args, options):
""" Returns a list of object segments based on a prefix.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
Returns:
A list of segments in the object store that match the supplied prefix.
"""
try:
segment_list = []
segments_path = os.path.join("/".join(args), options.prefix)
object_list = self.list_objects(
segments_path, strict_path_filter=True)
return object_list
except ClientError:
raise
def __get_object_headers(self, object_name, retries=1):
""" Utility method that gets an object head from the repository with retry support.
The default is to try once. Admittedly, this is ugly because the
provided object name might be a terminal object or a "directory" so
we need to try both cases for each retry attempt. When retry > 1, there is
a 1 second delay between attempts.
Args:
object_name (str): Name of the object.
retries (int): Optional parameter, default is 1 attempt.
Returns:
Object head dictionary or Boto3 exception.
"""
# Prevent retries from being set to less than 1.
retries = max(1, retries)
try:
for retry in range(0, retries):
try:
obj_headers = self.__client.head_object(Bucket=self.__bucket_name,
Key=object_name)
return obj_headers
except ClientError as error:
if error.response['ResponseMetadata']['HTTPStatusCode'] == 404:
try:
obj_headers = self.__client.head_object(Bucket=self.__bucket_name,
Key=object_name + '/')
# Hack to support Cohesity. They seem to ignore CntentType field
# put object field.
obj_headers['ContentType'] = obj_headers.get('ContentType') or 'application/x-directory'
return obj_headers
except ClientError as error:
if error.response['ResponseMetadata']['HTTPStatusCode'] == 404:
if '-segments' not in object_name:
try:
manifest_prefix = object_name.strip('/') + ".manifest"
if self.options['bucket_object_lock']:
manifests = self.__client.list_object_versions(
Bucket=self.__bucket_name,
Prefix=manifest_prefix)
else:
manifests = self.__client.list_objects(
Bucket=self.__bucket_name,
Prefix=manifest_prefix)
manifests = manifests.get('Versions', []) or manifests.get('Contents', [])
if manifests:
manifests = sorted(manifests,
key=lambda k: k['LastModified'])
manifest_name = None
if self.options['bucket_object_lock']:
for manifest in manifests:
obj_headers = self.__client.head_object(Bucket=self.__bucket_name,
Key=manifest['Key'],
VersionId=manifest.get('VersionId', ""))
if obj_headers['ResponseMetadata']['HTTPHeaders'].\
get('x-amz-meta-stamp-trilio-authenticity', "False") == "True":
return obj_headers
manifest_name = manifests[-1]['Key']
if not manifest_name:
raise Exception("Manifest object with "
"'x-amz-meta-stamp-trilio-authenticity' "
"attribute not found")
else:
manifest_name = manifest_prefix
obj_headers = self.__client.head_object(
Bucket=self.__bucket_name,
Key=manifest_name)
return obj_headers
except ClientError as error:
if (error.response[
'ResponseMetadata'][
'HTTPStatusCode'] == 404 and
retry + 1 != retries):
time.sleep(1)
continue
raise
if retry + 1 != retries:
time.sleep(1)
continue
raise
raise
except ClientError:
raise
def stat_object(self, args, options):
""" Get "operating system like" stat data for the given object.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
Returns:
A stat structure containing object information required by caller.
"""
stat_data = {'timestamp': 0, 'size': 0, 'etag': "", 'directory': False}
stat_data['headers'] = {}
stat_data['headers']['Metadata'] = {}
object_name = '/'.join(args)
try:
if not args:
stat_data['timestamp'] = _make_timestamp(
datetime.datetime.now())
return stat_data
else:
try:
obj_header = self.__get_object_headers(object_name)
except BaseException:
objects = self.__client.list_objects(
Bucket=self.__bucket_name,
Delimiter='/',
Prefix=object_name)
if 'CommonPrefixes' in objects:
if object_name + "/" in [x['Prefix'] for x in objects['CommonPrefixes']]:
md = datetime.datetime.strptime(objects['ResponseMetadata']['HTTPHeaders']['date'],
"%a, %d %b %Y %H:%M:%S %Z")
obj_header = {'ContentLength': 0,
'LastModified': md,
'Metadata': {}}
else:
raise
stat_data['timestamp'] = _make_timestamp(
obj_header['LastModified'])
stat_data['headers'] = obj_header
if (obj_header.get('ContentType', 'text/plain') ==
'application/x-directory' or
obj_header['ContentLength'] == 0):
stat_data['directory'] = True
# Copy the Metadata sub values into the stat structure in
# order to conform to our API.
for key, value in stat_data['headers']['Metadata'].items():
if 'x-object-meta-' in key.lower():
stat_data['headers'][
key.lower().split('x-object-meta-')[1]] = value
else:
stat_data['headers'][key.lower()] = value
if int(stat_data['headers'][
'Metadata'].get('x-object-meta-segment-count', 0)) > 0:
stat_data['size'] = stat_data['headers']['Metadata'].get(
'x-object-meta-total-size', 0)
elif int(stat_data['headers'].get('segment-count', 0)) > 0:
stat_data['size'] = stat_data['headers'].get(
'total-size', 0)
else:
stat_data['size'] = obj_header['ContentLength']
stat_data['x-account-bytes-used'] = stat_data['size']
return stat_data
except ClientError:
raise
def mkdir_object(self, args, options):
""" Create an object that represents a directory in the object store.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
"""
new_folder = '/'.join(args)
self.__create_folder(new_folder, options)
return
def rmdir_object(self, args, options):
""" Remove an object that represents a directory in the object store.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
"""
object_tree = '/'.join(args) + '/'
# S3 object names cannot start with "/"
if object_tree.startswith("/"):
object_tree = object_tree[1:]
self.__delete_object_tree(object_tree)
# Now remove the actual directory object.
try:
self.__client.delete_object(Bucket=self.__bucket_name,
Key=object_tree)
except BaseException:
pass
return
def upload_object_manifest(self, object_name, put_headers, manifest_data):
""" Upload a new object manifest to the object store.
Args:
object_name (str): Name of the object. A ".manifest" will be added.
put_headers (dic): Dictionary of meta data to be
added to the object.
manifest_data (dic): The manifest data which becomes
the body of file.
"""
try:
manifest_prefix = object_name.strip('/') + ".manifest"
if self.options['use_manifest_suffix']:
manifests = self.list_objects(manifest_prefix,
strict_path_filter=True)
if manifests:
suffix = sorted(manifests, reverse=True)[0].split('.')[-1]
suffix = int(suffix, 16)
suffix = "%08x" % (suffix + 1)
else:
suffix = "%08x" % 0
manifest_name = manifest_prefix + "." + suffix
else:
manifest_name = manifest_prefix
for segment_data in manifest_data:
segment_data['name'] = segment_data.pop('path')
segment_data['hash'] = segment_data.pop('etag', 0)
manifest = json.dumps(manifest_data)
self.__client.put_object(Bucket=self.__bucket_name,
Key=manifest_name,
Metadata=put_headers,
Body=manifest,
ContentType='text/plain')
if self.options['use_manifest_suffix']:
if manifests:
objs_to_delete = []
for man in manifests:
objs_to_delete.append({'Key': man})
try:
self.__client.delete_objects(
Bucket=self.__bucket_name,
Delete={'Objects': objs_to_delete})
except BaseException:
for obj in objs_to_delete:
self.__client.delete_object(Bucket=self.__bucket_name,
Key=obj['Key'])
except ClientError:
raise
return
def update_object_attributes(self, object_name, attributes):
""" Upload a new object manifest to the object store.
Args:
object_name (str): Name of the object. A ".manifest" will be added.
attributes (dic): The attributes data which becomes
the body of file.
"""
try:
manifest_prefix = object_name.strip('/') + ".manifest"
if self.options['use_manifest_suffix']:
manifests = self.list_objects(manifest_prefix,
strict_path_filter=True)
if manifests:
suffix = sorted(manifests, reverse=True)[0].split('.')[-1]
suffix = int(suffix, 16)
suffix = "%08x" % (suffix + 1)
else:
suffix = "%08x" % 0
manifest_name = manifest_prefix + "." + suffix
else:
manifest_name = manifest_prefix
copy_src = os.path.join(self.__bucket_name, manifest_name)
self.__client.copy_object(Bucket=self.__bucket_name,
Key=manifest_name,
CopySource=copy_src,
Metadata=attributes,
MetadataDirective='REPLACE')
except ClientError:
raise
return
def put_object_segment_retention(self, object_name, retainuntil):
""" Upload a new object manifest to the object store.
Args:
object_name (str): Name of the object. A ".manifest" will be added.
put_headers (dic): Dictionary of meta data to be added
to the object.
manifest_data (dic): The manifest data which becomes the
body of file.
"""
try:
object_name = object_name.strip('/')
self.__client.put_object_retention(
Bucket=self.__bucket_name,
Key=object_name,
Retention={"Mode": self.__retention_mode,
"RetainUntilDate": retainuntil}
)
except ClientError:
raise
return
def put_manifest_retention(self, object_name, retainuntil):
""" Upload a new object manifest to the object store.
Args:
object_name (str): Name of the object. A ".manifest"
will be added. put_headers (dic): Dictionary of meta data
to be added to the object. manifest_data (dic): The manifest
data which becomes the body of file.
"""
try:
manifest_prefix = object_name.strip('/') + ".manifest"
if self.options['use_manifest_suffix']:
manifests = self.list_objects(manifest_prefix,
strict_path_filter=True)
if manifests:
suffix = sorted(manifests, reverse=True)[0].split('.')[-1]
suffix = int(suffix, 16)
suffix = "%08x" % (suffix + 1)
else:
suffix = "%08x" % 0
manifest_name = manifest_prefix + "." + suffix
else:
manifest_name = manifest_prefix
self.__client.put_object_retention(
Bucket=self.__bucket_name,
Key=manifest_name,
Retention={"Mode": self.__retention_mode,
"RetainUntilDate": retainuntil}
)
except ClientError:
raise
return
def __create_folder(self, folder_name, options):
""" Utility method used to create a object directory structure.
The entire path is split and each directory level is created
as an object if it does not exist.
Args:
folder_name (str): The entire folder name.
options (dic): Dictionary of configuration options.
"""
try:
self.__client.head_object(
Bucket=self.__bucket_name, Key=folder_name) # + '/')
except ClientError as error:
if error.response['ResponseMetadata']['HTTPStatusCode'] == 404:
path_parts = folder_name.split('/')
new_path = ''
for part in path_parts:
if part == '':
break
new_path = new_path + part + '/'
try:
# obj_head =
self.__client.head_object(
Bucket=self.__bucket_name, Key=new_path)
except ClientError as error:
if error.response[
'ResponseMetadata'][
'HTTPStatusCode'] == 404:
try:
self.__client.put_object(
Bucket=self.__bucket_name, Key=new_path,
Body='TrilioVault directory object',
ContentType='application/x-directory')
except ClientError as error:
# if the directory already exists,
# some backends like (Minio) throws exception.
# We need to handle that exception is:
# ClientError('An error occurred
# (XMinioObjectExistsAsDirectory)
# when calling the PutObject
# operation: Object name already
# exists as a directory.',)
if error.response[
'ResponseMetadata'][
'HTTPStatusCode'] != 409:
raise
continue
def upload_object(self, args, options):
""" Upload an object to the S3 object store.
Args:
args (list): List of object name parts.
options (dic): Dictionary of configuration options.
"""
files = args[1:]
try:
# First check the path_valid flag to see if this
# is not the first segment. Note - Added this for
# performance reasons when I was tuning. Might be overkill
# if it becomes problematic.
if options.path_valid is False:
self.__create_folder(
os.path.dirname(args[0] + '/' + options['object_name']),
options)
if options.object_name is not None:
with open(files[0], 'rb') as data:
self.__client.upload_fileobj(
data, Bucket=self.__bucket_name,
Key=args[0] + '/' +
options['object_name'],
ExtraArgs={
'ContentType': 'application/octet-stream'
})
obj_headers = self.__client.head_object(
Bucket=self.__bucket_name,
Key=args[0] + '/' +
options['object_name'])
return obj_headers
# If experimenting with transfer config
# settings (a.k.a AWS chunking) include the argument
# below as the last argument in the upload_fileobj()
# call above. - cjk Config=__transfer_config
except ClientError:
raise