Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2017 Trilio Data, Inc.
# All Rights Reserved.
""" Trilio S3 Backend implimentation

    This module contains the back end implimentation of all of all S3 specific
    support.
"""

# Disable pylint unused-argument checking because this is an interface
# implimentation file and not all method arguments would be used.
# pylint: disable=unused-argument


import datetime
import time
import json
import os
from oslo_log import log as logging

# AWS S3 API... why we're doing all this.
import boto3
import botocore
from botocore.exceptions import ClientError

from s3fuse.utils import get_retention_mode

BASENAME = 's3'
MAX_S3_PAGE_SIZE = 1000
_SEGMENT_PREFIX = "80bc80ff-0c51-4534-86a2-ec5e719643c2"
DEFAULT_MAX_POOL_SIZE = 500
LOG = logging.getLogger(__name__)

def _make_timestamp(modified_time):
    """ Utility function used to convert a datetime to an OS timestamp.

    Args:
        modified_time (datetime): Datatime object to convert to a Unix Epoc timestamp.

    Returns:
        The value of modified_time as a timestamp.
    """
    naive_time = modified_time.replace(tzinfo=None)
    delta_seconds = (
        naive_time - datetime.datetime(1970, 1, 1)).total_seconds()
    return delta_seconds


class S3Backend(object):
    """ S3 Backend implimentation.

    A Wrapper for the AWS S3 boto3 and botocore API. This class encapsulates all S3
    operations and exposes them as a backend storage instance.
    """

    def __init__(self, options):
        config_object = None
        s3_client = None
        self.options = options
        MAX_POOL_SIZE = int(options.get("vault_s3_max_pool_connections")) if options.get(
            "vault_s3_max_pool_connections") else DEFAULT_MAX_POOL_SIZE

        if options['s3_signature'] != 'default':
            config_object = botocore.client.Config(signature_version=options['s3_signature'],
                    read_timeout=int(options['s3_read_timeout']),
                    max_pool_connections=MAX_POOL_SIZE)
        else:
            config_object = botocore.client.Config(read_timeout=int(options['s3_read_timeout']),
                    max_pool_connections=MAX_POOL_SIZE)
        if options['s3_ssl_cert'] != '':
            s3_client = boto3.client('s3',
                    region_name=options['os_options']['region_name'],
                    aws_access_key_id=options['user'],
                    aws_secret_access_key=options['key'],
                    endpoint_url=options['os_options']['object_storage_url'],
                    config=config_object, verify=options['s3_ssl_cert'])
        else:
            s3_client = boto3.client('s3',
                    region_name=options['os_options']['region_name'],
                    aws_access_key_id=options['user'],
                    aws_secret_access_key=options['key'],
                    endpoint_url=options['os_options']['object_storage_url'],
                    config=config_object)

        def set_connection_header(request, operation_name, **kwargs):
            request.headers['Connection'] = 'keep-alive'

        s3_client.meta.events.register('request-created.s3', set_connection_header)
        self.__client = s3_client
        self.__bucket_name = options['bucket']
        self.__retention_mode = options['retention_mode']

    def set_retention_mode(self, options):
        # Get the retention mode if not set already.
        options['retention_mode'] = get_retention_mode(self.__client, options['bucket'], options['bucket_object_lock'])
        self.__retention_mode =  options['retention_mode']

    def validate_s3_client(self, options):
        """ Validates the S3 credentials / permission on the provided bucket.
            It also validates against s3 object lock configuration and if the s3 supports
            more than 255 object name length
        Args:
            self : S3 Backend object
        Returns:
            Does not return anything, But raises exception if validation fails.
        """
        try:
            self.__client.head_bucket(Bucket=self.__bucket_name)
            try:
                obj_lock_cfg = self.__client.get_object_lock_configuration(Bucket=self.__bucket_name)
                if options['bucket_object_lock'] and \
                    obj_lock_cfg["ObjectLockConfiguration"]["ObjectLockEnabled"] != "Enabled":
                    raise Exception("Configration says object lock enabled on the bucket, "
                                   "but s3 API does not agree with it. %s" % obj_lock_cfg)

                if not options['bucket_object_lock'] and \
                    obj_lock_cfg["ObjectLockConfiguration"]["ObjectLockEnabled"] == "Enabled":
                    raise Exception("Configration says object lock enabled on the bucket, "
                                   "but s3 API does not agree with it. %s" % obj_lock_cfg)

                if options['bucket_object_lock']:
                    if not options['use_manifest_suffix']:
                        raise Exception("'use_manifest_suffix' must be set to True when "
                                        "'bucket_object_lock' is set to True")
                  
            except Exception as ex:
                if options['bucket_object_lock']:
                    raise

            # Add a check to see if the current object store will support
            # our path length.
            if not options['bucket_object_lock']:
                long_key = os.path.join(
                    'tvault_config/',
                    'workload_f5190be6-7f80-4856-8c24-149cb40500c5/',
                    'snapshot_f2e5c6a7-3c21-4b7f-969c-915bb408c64f/',
                    'vm_id_e81d1ac8-b49a-4ccf-9d92-5f1ef358f1be/',
                    'vm_res_id_72477d99-c475-4a5d-90ae-2560f5f3b319_vda/',
                    'deac2b8a-dca9-4415-adc1-f3c6598204ed-segments/',
                    '0000000000000000.00000000')
                self.__client.put_object(
                    Bucket=self.__bucket_name, Key=long_key, Body='Test Data')

                self.__client.delete_object(Bucket=self.__bucket_name, Key=long_key)
        except:
            raise

    def __delete_object_list(self, object_list):
        """ Utility method that takes a list of objects and puts it in the correct format
            for the S3 delete_objects() API.

        Args:
            object_list (list): A list of objects with the correct S3 path.
        """
        try:
            # Call delete_objects once for every 1,000 objects since it has a limit
            # of 1,000 objects at a time.
            while len(object_list) > 0:
                object_delete_list = []
                list_part = object_list[:MAX_S3_PAGE_SIZE]
                for obj in list_part:
                    # S3 object names cannot start with "/"
                    if obj.startswith("/"):
                        obj = obj[1:]
                    object_delete_list.append({'Key': obj})

                try:
                    self.__client.delete_objects(Bucket=self.__bucket_name,
                                                 Delete={'Objects': object_delete_list})
                except:
                    for obj in object_delete_list:
                        self.__client.delete_object(Bucket=self.__bucket_name,
                                                    Key=obj['Key'])
                object_list = object_list[MAX_S3_PAGE_SIZE:]
        except ClientError:
            raise

    def __delete_object_tree(self, object_tree):
        """ Utility method used to perform a rmdir operation on an object hierarchy

        Args:
            object_tree (str): Object/Tree path.
        """
        try:
            object_list = []
            # The list_objects API is limited to 1,000 objects so we need to
            # use a paginator.
            list_paginator = self.__client.get_paginator('list_objects')
            page_iterator = list_paginator.paginate(
                Bucket=self.__bucket_name, Prefix=object_tree)
            for objects in page_iterator:
                if 'Contents' in objects and objects.get('KeyCount', len(objects['Contents'])) > 0:
                    for obj in objects['Contents']:
                        object_list.append(obj['Key'])

            self.__delete_object_list(object_list)
        except ClientError:
            raise

    def delete_object_list(self, object_list, options):
        """ Delete a list of objects.

        Args:
            object_list (list): List of objects. Each with a full object path
                                in correct format.
            options (dic): Dictionary of configuration options.
        """
        self.__delete_object_list(object_list)

    def delete_object(self, args, options):
        """ Delete an object from the S3 object store.

        The object and any segments stored in the segment directory will be
        removed.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        try:
            if not args:
                return

            object_name = '/'.join(args)
            # S3 object names cannot start with "/"
            if object_name.startswith("/"):
                object_name = object_name[1:]

            if '-segments' in object_name:
                # Just delete a single segment.
                self.__client.delete_object(Bucket=self.__bucket_name,
                                            Key=object_name)
            else:
                manifest_prefix = object_name.strip('/') + ".manifest"
                manifests = self.list_objects(manifest_prefix, strict_path_filter=True)
                if manifests:
                    manifest_name = sorted(manifests, reverse=True)[0]
                else:
                    manifest_name = manifest_prefix
                self.__client.delete_object(Bucket=self.__bucket_name,
                                            Key=manifest_name)
        except ClientError:
            raise

    def __wait_for_object(self, object_name, retries=1):
        """ Utility method used to wait for a S3 object to become available.

        This routine will keep performing a head_object() request every second
        for "retries" number of times. This was added to work around any potential
        eventual consistancey issues when uploading to AWS.

        Args:
            object_name (str): Name of the object.
            retries (int): Optional parameter that defaults to 1.

        Returns:
            Returns when the object is available or ClientError not found exception.
        """
        # Make sure retries is at least 1
        retries = max(1, retries)
        try:
            for retry in range(0, retries):
                try:
                    self.__client.head_object(Bucket=self.__bucket_name,
                                              Key=object_name)
                    return
                except ClientError as error:
                    if (error.response['ResponseMetadata']['HTTPStatusCode'] == 404 and
                            retry + 1 != retries):
                        time.sleep(1)
                        continue
                    raise
        except ClientError:
            raise

    def get_object_manifest(self, object_name):
        """ Download and return the object manifest as a json array.

            Args:
                object_name (str): Name of the object.

            Returns:
                Object manifest as a dictionary.
        """
        try:
            manifest_prefix = object_name.strip('/') + ".manifest"
            if self.options['use_manifest_suffix']:
                if self.options['bucket_object_lock']:
                    manifests = self.__client.list_object_versions(
                                    Bucket=self.__bucket_name,
                                    Prefix=manifest_prefix)
                else:
                    manifests = self.__client.list_objects(
                                    Bucket=self.__bucket_name,
                                    Prefix=manifest_prefix)
                if manifests:
                    manifest_name = manifest_prefix
                    manifests = manifests.get('Versions', []) or manifests.get('Contents', [])

                    if self.options['bucket_object_lock']:
                        for manifest in manifests:
                            resp = self.__client.get_object(
                                Bucket=self.__bucket_name, Key=manifest['Key'],
                                VersionId=manifest.get('VersionId', ""))
                            if resp['ResponseMetadata']['HTTPHeaders'].\
                                get('x-amz-meta-stamp-trilio-authenticity', "False") == "True":
                                return json.loads(resp['Body'].read(resp['ContentLength']))

                    manifest_name = manifests[-1]['Key']
            else:
                manifest_name = manifest_prefix

            if not manifest_name:
                raise Exception("Manifest object with "
                                "'x-amz-meta-stamp-trilio-authenticity' "
                                "attribute not found")
            resp = self.__client.get_object(
                Bucket=self.__bucket_name, Key=manifest_name)
            return json.loads(resp['Body'].read(resp['ContentLength']))
        except ClientError as error:
            if error.response['ResponseMetadata']['HTTPStatusCode'] != 404:
                raise
            else:
                try:
                    self.__wait_for_object(manifest_name, 10)
                    resp = self.__client.get_object(Bucket=self.__bucket_name,
                                                    Key=manifest_name)
                    return json.loads(resp['Body'].read(resp['ContentLength']))
                except ClientError:
                    raise

    def delete_object_manifest(self, object_name):
        """ Download and return the object manifest as a json array.

            Args:
                object_name (str): Name of the object.

            Returns:
                Object manifest as a dictionary.
        """
        manifest_prefix = object_name.strip('/') + ".manifest"
        if self.options['bucket_object_lock']:
            resp = self.__client.list_object_versions(
                            Bucket=self.__bucket_name,
                            Prefix=manifest_prefix)
        else:
            resp = self.__client.list_objects(
                            Bucket=self.__bucket_name,
                            Prefix=manifest_prefix)
        if resp:
            manifests = resp.get('Versions', []) or resp.get('Contents', [])
            objs_to_delete = []
            for man in manifests:
                objs_to_delete.append({'Key': man['Key']})
            if objs_to_delete:
                try:
                    self.__client.delete_objects(Bucket=self.__bucket_name,
                                                 Delete={'Objects': objs_to_delete})
                except:
                    for obj in objs_to_delete:
                        self.__client.delete_object(Bucket=self.__bucket_name,
                                                    Key=obj['Key'])

        else:
            raise Exception("Manifest for %s is not found" % object_name)

    def download_object(self, args, options):
        """ Download a file from the S3 object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.

        Returns:
            On success, the contents of the object are downloaded to file identidfied
            by options.out_file.
        """
        try:
            object_name = '/'.join(args)
            extra_args = {}
            if options.version_id:
                extra_args = { 'VersionId': options.version_id }
            self.__client.download_file(
                self.__bucket_name, object_name, options.out_file, extra_args)

        except ClientError:
            raise
        except Exception:
            raise

    def get_object(self, args, options):
        """ Get object from the S3 object store in buffer.
        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        Returns:
            On success, the contents of the object are downloaded to file identidfied
            by options.out_file.
        """
        try:
            object_name = '/'.join(args)
            if options.version_id:
                response = self.__client.get_object(
                    Bucket=self.__bucket_name,
                    Key=object_name,
                    VersionId=options.version_id)
            else:
                response = self.__client.get_object(
                    Bucket=self.__bucket_name,
                    Key=object_name)
            return response["Body"].read()
        except ClientError:
            raise
        except Exception:
            raise

    def put_object(self, args, buf, buf_len, options):
        """ Put an object to the S3 object store.
        Args:
            args (list): List of object name parts.
            buf: Data to upload in object.
            buf_len: Length of data to upload in object.
            options (dic): Dictionary of configuration options.
        """
        try:
            # First check the path_valid flag to see if this is not the first segment.
            # Note - Added this for performance reasons when I was tuning.  Might be overkill if it becomes problematic.
            if options.path_valid is False:
                self.__create_folder(os.path.dirname(args[0] + '/' + options['object_name']),
                                     options)

            if options.object_name is not None:
                resp = self.__client.put_object(Bucket=self.__bucket_name,
                                    Key=args[0] + '/' + options['object_name'],
                                    ContentLength=buf_len,
                                    Body=buf,
                                    ContentType='application/octet-stream')

                # response object contains etag, version id and any necessary fields
                # so we avoid calling head object.
                return resp
        except ClientError:
            raise

    def list_objects(self, prefix, max_items=None, strict_path_filter=False):
        """ Return a list of objects based on the provided prefix.

        Used to generate a directory listing based on a prefix path
        constructed from the object parts in prefix

        Args:
            prefix (string): prefix of the object path. If the prefix is terminated 
                by "/", then it lists the contents in the "subdir". If the prefix
                is not terminated by "/", then it lists the contents in the current
                directory that matches the prefix
                Ex:
                    case 1:
                    =======
                    prefix - /a/b/c
                    object store has objects - /a/b/c, /a/b/c1, /a/b/d
                    return values - /a/b/c and /a/b/c1

                    case 2:
                    =======
                    prefix - /a/b/c/
                    object store has objects - /a/b/c/d, /a/b/c1, /a/b/c/e
                    return values - /a/b/c/d and /a/b/c/e
            options (dic): Dictionary of configuration options
            max_items (int): Maximum number of objects in a list query. Default is
                unlimited but when testing for an empty directory a smaller
                number can be used to limit the query and size of the list that
                is returned.
            strict_path_filter (bool): Defaults to True in order to only return
                objects with the exact same prefix. Individual segment listings
                set this to False in order to get a match on the segment offset.

        Returns:
            On success a list of unique items to be used as a directory listing.
        """
        object_set = set()
        prefix = prefix.lstrip("/")

        try:
            # The list_objects API is limited to 1,000 objects so we need to
            # use a paginator. Note: Page size is just a "hint" to the API.
            # The API may choose to return more or fewer items per page.
            page_size = MAX_S3_PAGE_SIZE
            if max_items is not None and max_items < MAX_S3_PAGE_SIZE:
                page_size = max_items
            if self.options['bucket_object_lock']:
                list_paginator = self.__client.get_paginator('list_object_versions')
            else:
                list_paginator = self.__client.get_paginator('list_objects')
            page_iterator = list_paginator.paginate(Bucket=self.__bucket_name,  # Prefix='',
                                                    Prefix=prefix,
                                                    Delimiter='/',
                                                    PaginationConfig={'PageSize': page_size})
            for objects in page_iterator:
                if 'Contents' in objects and objects.get('KeyCount', len(objects['Contents'])) > 0:
                    for item in objects['Contents']:
                        # Short circuit processing if we need fewer items.
                        if max_items is not None and len(object_set) >= max_items:
                            return list(object_set)

                        object_name = item['Key'].rstrip('/')

                        if strict_path_filter:
                            object_set.add(object_name)
                        elif ".manifest" in object_name:
                            # Trim ".manifest" and everything after it if that is in the name.
                            object_set.add(object_name.split(".manifest", 1)[0])

                if 'Versions' in objects and objects.get('KeyCount', len(objects['Versions'])) > 0:
                    for item in objects['Versions']:
                        # Short circuit processing if we need fewer items.
                        if max_items is not None and len(object_set) >= max_items:
                            return list(object_set)

                        object_name = item['Key'].rstrip('/')

                        if strict_path_filter:
                            object_set.add(object_name)
                        elif ".manifest" in object_name:
                            # Trim ".manifest" and everything after it if that is in the name.
                            object_set.add(object_name.split(".manifest", 1)[0])
                if 'CommonPrefixes' in objects:
                    for object_prefix in objects['CommonPrefixes']:
                        # Short circuit processing if we need fewer items.
                        if max_items is not None and len(object_set) >= max_items:
                            return list(object_set)
                        # Filter out the segment directory tree.
                        if prefix.startswith(_SEGMENT_PREFIX) or \
                            not object_prefix['Prefix'].startswith(_SEGMENT_PREFIX):
                            if prefix.endswith("/"):
                                object_set.add(object_prefix['Prefix'].strip('/'))
                                #object_set.add(object_prefix['Prefix'].split(prefix.rstrip("/"))[1].strip('/'))
                            else:
                                object_set.add(object_prefix['Prefix'].strip('/'))

            return list(object_set)

        except ClientError:
            raise

    def list_segments(self, args, options):
        """ Returns a list of object segments based on a prefix.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.

        Returns:
            A list of segments in the object store that match the supplied prefix.
        """
        try:
            segment_list = []
            segments_path = os.path.join("/".join(args), options.prefix)
            object_list = self.list_objects(
                segments_path, strict_path_filter=True)

            return object_list

        except ClientError:
            raise

    def __get_object_headers(self, object_name, retries=1):
        """ Utility method that gets an object head from the repository with retry support.

        The default is to try once. Admittedly, this is ugly because the
        provided object name might be a terminal object or a "directory" so
        we need to try both cases for each retry attempt. When retry > 1, there is
        a 1 second delay between attempts.

        Args:
            object_name (str): Name of the object.
            retries (int): Optional parameter, default is 1 attempt.

        Returns:
            Object head dictionary or Boto3 exception.
        """
        # Prevent retries from being set to less than 1.
        retries = max(1, retries)
        try:
            for retry in range(0, retries):
                try:
                    obj_headers = self.__client.head_object(Bucket=self.__bucket_name,
                                                            Key=object_name)
                    return obj_headers
                except ClientError as error:
                    if error.response['ResponseMetadata']['HTTPStatusCode'] == 404:
                        try:
                            obj_headers = self.__client.head_object(Bucket=self.__bucket_name,
                                                                    Key=object_name + '/')
                            # Hack to support Cohesity. They seem to ignore CntentType field
                            # put object field.
                            obj_headers['ContentType'] = obj_headers.get('ContentType') or 'application/x-directory'
                            return obj_headers
                        except ClientError as error:
                            if error.response['ResponseMetadata']['HTTPStatusCode'] == 404:
                                if '-segments' not in object_name:
                                    try:
                                        manifest_prefix = object_name.strip('/') + ".manifest"
                                        if self.options['bucket_object_lock']:
                                            manifests = self.__client.list_object_versions(
                                                            Bucket=self.__bucket_name,
                                                            Prefix=manifest_prefix)
                                        else:
                                            manifests = self.__client.list_objects(
                                                            Bucket=self.__bucket_name,
                                                            Prefix=manifest_prefix)
                                        manifests = manifests.get('Versions', []) or manifests.get('Contents', [])
                                        if manifests:
                                            manifests = sorted(manifests,
                                                               key=lambda k: k['LastModified'])
          
                                            manifest_name = None
                                            if self.options['bucket_object_lock']:
                                                for manifest in manifests:
                                                    obj_headers = self.__client.head_object(Bucket=self.__bucket_name,
                                                                                            Key=manifest['Key'],
                                                                                            VersionId=manifest.get('VersionId', ""))
                                                    if obj_headers['ResponseMetadata']['HTTPHeaders'].\
                                                        get('x-amz-meta-stamp-trilio-authenticity', "False") == "True":
                                                        return obj_headers
                                            manifest_name = manifests[-1]['Key']

                                            if not manifest_name:
                                                raise Exception("Manifest object with "
                                                                "'x-amz-meta-stamp-trilio-authenticity' "
                                                                "attribute not found")
                                        else:
                                            manifest_name = manifest_prefix

                                        obj_headers = self.__client.head_object(
                                            Bucket=self.__bucket_name,
                                            Key=manifest_name)
                                        return obj_headers
                                    except ClientError as error:
                                        if (error.response[
                                                'ResponseMetadata'][
                                                'HTTPStatusCode'] == 404 and
                                                retry + 1 != retries):
                                            time.sleep(1)
                                            continue
                                        raise
                                if retry + 1 != retries:
                                    time.sleep(1)
                                    continue
                            raise
                    raise
        except ClientError:
            raise

    def stat_object(self, args, options):
        """ Get "operating system like" stat data for the given object.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.

        Returns:
            A stat structure containing object information required by caller.
        """
        stat_data = {'timestamp': 0, 'size': 0, 'etag': "", 'directory': False}
        stat_data['headers'] = {}
        stat_data['headers']['Metadata'] = {}
        object_name = '/'.join(args)
        try:
            if not args:
                stat_data['timestamp'] = _make_timestamp(
                    datetime.datetime.now())
                return stat_data
            else:
                try:
                    obj_header = self.__get_object_headers(object_name)
                except BaseException:
                    objects = self.__client.list_objects(
                        Bucket=self.__bucket_name,
                        Delimiter='/',
                        Prefix=object_name)
                    if 'CommonPrefixes' in objects:
                        if object_name + "/" in [x['Prefix'] for x in objects['CommonPrefixes']]:
                            md = datetime.datetime.strptime(objects['ResponseMetadata']['HTTPHeaders']['date'],
                                                            "%a, %d %b %Y %H:%M:%S %Z")
                            obj_header = {'ContentLength': 0,
                                          'LastModified': md,
                                          'Metadata': {}}
                    else:
                        raise

                stat_data['timestamp'] = _make_timestamp(
                    obj_header['LastModified'])
                stat_data['headers'] = obj_header

                if (obj_header.get('ContentType', 'text/plain') ==
                        'application/x-directory' or
                        obj_header['ContentLength'] == 0):
                    stat_data['directory'] = True

                # Copy the Metadata sub values into the stat structure in
                # order to conform to our API.
                for key, value in stat_data['headers']['Metadata'].items():
                    if 'x-object-meta-' in key.lower():
                        stat_data['headers'][
                            key.lower().split('x-object-meta-')[1]] = value
                    else:
                        stat_data['headers'][key.lower()] = value

                if int(stat_data['headers'][
                        'Metadata'].get('x-object-meta-segment-count', 0)) > 0:
                    stat_data['size'] = stat_data['headers']['Metadata'].get(
                        'x-object-meta-total-size', 0)
                elif int(stat_data['headers'].get('segment-count', 0)) > 0:
                    stat_data['size'] = stat_data['headers'].get(
                        'total-size', 0)
                else:
                    stat_data['size'] = obj_header['ContentLength']

                stat_data['x-account-bytes-used'] = stat_data['size']
                return stat_data

        except ClientError:
            raise

    def mkdir_object(self, args, options):
        """ Create an object that represents a directory in the object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        new_folder = '/'.join(args)
        self.__create_folder(new_folder, options)
        return

    def rmdir_object(self, args, options):
        """ Remove an object that represents a directory in the object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        object_tree = '/'.join(args) + '/'
        # S3 object names cannot start with "/"
        if object_tree.startswith("/"):
            object_tree = object_tree[1:]
        self.__delete_object_tree(object_tree)
        # Now remove the actual directory object.
        try:
            self.__client.delete_object(Bucket=self.__bucket_name,
                                        Key=object_tree)
        except BaseException:
            pass
        return

    def upload_object_manifest(self, object_name, put_headers, manifest_data):
        """ Upload a new object manifest to the object store.

        Args:
            object_name (str): Name of the object. A ".manifest" will be added.
            put_headers (dic): Dictionary of meta data to be
                               added to the object.
            manifest_data (dic): The manifest data which becomes
                                 the body of file.
        """
        try:
            manifest_prefix = object_name.strip('/') + ".manifest"
            if self.options['use_manifest_suffix']:
                manifests = self.list_objects(manifest_prefix,
                                              strict_path_filter=True)
                if manifests:
                    suffix = sorted(manifests, reverse=True)[0].split('.')[-1]
                    suffix = int(suffix, 16)
                    suffix = "%08x" % (suffix + 1)
                else:
                    suffix = "%08x" % 0
                manifest_name = manifest_prefix + "." + suffix
            else:
                manifest_name = manifest_prefix

            for segment_data in manifest_data:
                segment_data['name'] = segment_data.pop('path')
                segment_data['hash'] = segment_data.pop('etag', 0)
            manifest = json.dumps(manifest_data)
            self.__client.put_object(Bucket=self.__bucket_name,
                                     Key=manifest_name,
                                     Metadata=put_headers,
                                     Body=manifest,
                                     ContentType='text/plain')
            if self.options['use_manifest_suffix']:
                if manifests:
                    objs_to_delete = []
                    for man in manifests:
                        objs_to_delete.append({'Key': man})
                    try:
                        self.__client.delete_objects(
                            Bucket=self.__bucket_name,
                            Delete={'Objects': objs_to_delete})
                    except BaseException:
                        for obj in objs_to_delete:
                            self.__client.delete_object(Bucket=self.__bucket_name,
                                                        Key=obj['Key'])
        except ClientError:
            raise
        return

    def update_object_attributes(self, object_name, attributes):
        """ Upload a new object manifest to the object store.

        Args:
            object_name (str): Name of the object. A ".manifest" will be added.
            attributes (dic): The attributes data which becomes
                              the body of file.
        """
        try:
            manifest_prefix = object_name.strip('/') + ".manifest"

            if self.options['use_manifest_suffix']:
                manifests = self.list_objects(manifest_prefix,
                                              strict_path_filter=True)
                if manifests:
                    suffix = sorted(manifests, reverse=True)[0].split('.')[-1]
                    suffix = int(suffix, 16)
                    suffix = "%08x" % (suffix + 1)
                else:
                    suffix = "%08x" % 0
                manifest_name = manifest_prefix + "." + suffix
            else:
                manifest_name = manifest_prefix
            copy_src = os.path.join(self.__bucket_name, manifest_name)
            self.__client.copy_object(Bucket=self.__bucket_name,
                                      Key=manifest_name,
                                      CopySource=copy_src,
                                      Metadata=attributes,
                                      MetadataDirective='REPLACE')
        except ClientError:
            raise
        return

    def put_object_segment_retention(self, object_name, retainuntil):
        """ Upload a new object manifest to the object store.

        Args:
            object_name (str): Name of the object. A ".manifest" will be added.
            put_headers (dic): Dictionary of meta data to be added
                               to the object.
            manifest_data (dic): The manifest data which becomes the
                                  body of file.
        """
        try:
            object_name = object_name.strip('/')
            self.__client.put_object_retention(
                Bucket=self.__bucket_name,
                Key=object_name,
                Retention={"Mode": self.__retention_mode,
                           "RetainUntilDate": retainuntil}
            )
        except ClientError:
            raise
        return

    def put_manifest_retention(self, object_name, retainuntil):
        """ Upload a new object manifest to the object store.

        Args:
            object_name (str): Name of the object. A ".manifest"
            will be added. put_headers (dic): Dictionary of meta data
            to be added to the object. manifest_data (dic): The manifest
            data which becomes the body of file.
        """
        try:
            manifest_prefix = object_name.strip('/') + ".manifest"
            if self.options['use_manifest_suffix']:
                manifests = self.list_objects(manifest_prefix,
                                              strict_path_filter=True)
                if manifests:
                    suffix = sorted(manifests, reverse=True)[0].split('.')[-1]
                    suffix = int(suffix, 16)
                    suffix = "%08x" % (suffix + 1)
                else:
                    suffix = "%08x" % 0
                manifest_name = manifest_prefix + "." + suffix
            else:
                manifest_name = manifest_prefix
            self.__client.put_object_retention(
                Bucket=self.__bucket_name,
                Key=manifest_name,
                Retention={"Mode": self.__retention_mode,
                           "RetainUntilDate": retainuntil}
            )
        except ClientError:
            raise
        return

    def __create_folder(self, folder_name, options):
        """ Utility method used to create a object directory structure.

        The entire path is split and each directory level is created
        as an object if it does not exist.

        Args:
            folder_name (str): The entire folder name.
            options (dic): Dictionary of configuration options.
        """
        try:
            self.__client.head_object(
                Bucket=self.__bucket_name, Key=folder_name)  # + '/')
        except ClientError as error:
            if error.response['ResponseMetadata']['HTTPStatusCode'] == 404:
                path_parts = folder_name.split('/')
                new_path = ''
                for part in path_parts:
                    if part == '':
                        break
                    new_path = new_path + part + '/'
                    try:
                        # obj_head =
                        self.__client.head_object(
                            Bucket=self.__bucket_name, Key=new_path)
                    except ClientError as error:
                        if error.response[
                                'ResponseMetadata'][
                                'HTTPStatusCode'] == 404:
                            try:
                                self.__client.put_object(
                                    Bucket=self.__bucket_name, Key=new_path,
                                    Body='TrilioVault directory object',
                                    ContentType='application/x-directory')
                            except ClientError as error:
                                # if the directory already exists,
                                # some backends like (Minio) throws exception.
                                # We need to handle that exception is:
                                # ClientError('An error occurred
                                # (XMinioObjectExistsAsDirectory)
                                # when calling the PutObject
                                # operation: Object name already
                                # exists as a directory.',)
                                if error.response[
                                        'ResponseMetadata'][
                                        'HTTPStatusCode'] != 409:
                                    raise
                        continue

    def upload_object(self, args, options):
        """ Upload an object to the S3 object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        files = args[1:]
        try:
            # First check the path_valid flag to see if this
            # is not the first segment. Note - Added this for
            # performance reasons when I was tuning. Might be overkill
            # if it becomes problematic.
            if options.path_valid is False:
                self.__create_folder(
                    os.path.dirname(args[0] + '/' + options['object_name']),
                    options)
            if options.object_name is not None:
                with open(files[0], 'rb') as data:
                    self.__client.upload_fileobj(
                        data, Bucket=self.__bucket_name,
                        Key=args[0] + '/' +
                        options['object_name'],
                        ExtraArgs={
                            'ContentType': 'application/octet-stream'
                        })
                    obj_headers = self.__client.head_object(
                        Bucket=self.__bucket_name,
                        Key=args[0] + '/' +
                        options['object_name'])
                    return obj_headers
                    # If experimenting with transfer config
                    # settings (a.k.a AWS chunking) include the argument
                    # below as the last argument in the upload_fileobj()
                    # call above. - cjk Config=__transfer_config
        except ClientError:
            raise