Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2017 Trilio Data, Inc.
# All Rights Reserved.
""" Trilio S3 Backend implimentation

    This module contains the back end implimentation of all of all S3 specific
    support.
"""

# Disable pylint unused-argument checking because this is an interface
# implimentation file and not all method arguments would be used.
# pylint: disable=unused-argument


import datetime
import time
import json
import os

# AWS S3 API... why we're doing all this.
import boto3
import botocore
from botocore.exceptions import ClientError

BASENAME = "s3"
MAX_S3_PAGE_SIZE = 1000
_SEGMENT_PREFIX = "80bc80ff-0c51-4534-86a2-ec5e719643c2"


def _make_timestamp(modified_time):
    """ Utility function used to convert a datetime to an OS timestamp.

    Args:
        modified_time (datetime): Datatime object to convert to a Unix Epoc timestamp.

    Returns:
        The value of modified_time as a timestamp.
    """
    naive_time = modified_time.replace(tzinfo=None)
    delta_seconds = (naive_time - datetime.datetime(1970, 1, 1)).total_seconds()
    return delta_seconds


class S3Backend(object):
    """ S3 Backend implimentation.

    A Wrapper for the AWS S3 boto3 and botocore API. This class encapsulates all S3
    operations and exposes them as a backend storage instance.
    """

    def __init__(self, options):
        config_object = None
        if options["s3_signature"] != "default":
            config_object = botocore.client.Config(
                signature_version=options["s3_signature"],
                read_timeout=int(options["s3_read_timeout"]),
            )
        else:
            config_object = botocore.client.Config(
                read_timeout=int(options["s3_read_timeout"])
            )

        self.__client = boto3.client(
            "s3",
            region_name=options["os_options"]["region_name"],
            use_ssl=options["s3_ssl"],
            aws_access_key_id=options["user"],
            aws_secret_access_key=options["key"],
            endpoint_url=options["os_options"]["object_storage_url"],
            config=config_object,
        )

        self.__bucket_name = options["bucket"]
        # Keep the TransferConfig here for future performance tweaking.
        # I'm not sure if 3rd Party (Not AWS) S3 APIs will support this.
        # self.__transfer_config = boto3.s3.transfer.TransferConfig(multipart_threshold=10485760,
        #                                                           max_concurrency=10,
        #                                                           multipart_chunksize=5242880,
        #                                                           num_download_attempts=5,
        #                                                           max_io_queue=100,
        #                                                           io_chunksize=1024 * 1024,
        #                                                           use_threads=True)
        # self.__transfer_config = boto3.s3.transfer.TransferConfig(multipart_threshold=3355443,
        #                                                    max_concurrency=10,
        #                                                    multipart_chunksize=3355443,
        #                                                    num_download_attempts=5,
        #                                                    max_io_queue=100,
        #                                                    io_chunksize=262144,
        #                                                    use_threads=True)

    def __delete_object_list(self, object_list):
        """ Utility method that takes a list of objects and puts it in the correct format
            for the S3 delete_objects() API.

        Args:
            object_list (list): A list of objects with the correct S3 path.
        """
        try:
            # Call delete_objects once for every 1,000 objects since it has a limit
            # of 1,000 objects at a time.
            while len(object_list) > 0:
                object_delete_list = []
                list_part = object_list[:MAX_S3_PAGE_SIZE]
                for obj in list_part:
                    # S3 object names cannot start with "/"
                    if obj.startswith("/"):
                        obj = obj[1:]
                    object_delete_list.append({"Key": obj})

                self.__client.delete_objects(
                    Bucket=self.__bucket_name, Delete={"Objects": object_delete_list},
                )
                object_list = object_list[MAX_S3_PAGE_SIZE:]
        except ClientError:
            raise

    def __delete_object_tree(self, object_tree):
        """ Utility method used to perform a rmdir operation on an object hierarchy

        Args:
            object_tree (str): Object/Tree path.
        """
        try:
            object_list = []
            # The list_objects API is limited to 1,000 objects so we need to
            # use a paginator.
            list_paginator = self.__client.get_paginator("list_objects")
            page_iterator = list_paginator.paginate(
                Bucket=self.__bucket_name, Prefix=object_tree
            )
            for objects in page_iterator:
                if (
                    "Contents" in objects
                    and objects.get("KeyCount", len(objects["Contents"])) > 0
                ):
                    for obj in objects["Contents"]:
                        object_list.append(obj["Key"])

            self.__delete_object_list(object_list)
        except ClientError:
            raise

    def delete_object_list(self, object_list, options):
        """ Delete a list of objects.

        Args:
            object_list (list): List of objects. Each with a full object path
                                in correct format.
            options (dic): Dictionary of configuration options.
        """
        self.__delete_object_list(object_list)

    def delete_object(self, args, options):
        """ Delete an object from the S3 object store.

        The object and any segments stored in the segment directory will be
        removed.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        try:
            if not args:
                return

            object_name = "/".join(args)
            # S3 object names cannot start with "/"
            if object_name.startswith("/"):
                object_name = object_name[1:]

            if "-segments" in object_name:
                # Just delete a single segment.
                self.__client.delete_object(Bucket=self.__bucket_name, Key=object_name)
            else:
                self.__client.delete_object(
                    Bucket=self.__bucket_name, Key=object_name + ".manifest"
                )

        except ClientError:
            raise

    def __wait_for_object(self, object_name, retries=1):
        """ Utility method used to wait for a S3 object to become available.

        This routine will keep performing a head_object() request every second
        for "retries" number of times. This was added to work around any potential
        eventual consistancey issues when uploading to AWS.

        Args:
            object_name (str): Name of the object.
            retries (int): Optional parameter that defaults to 1.

        Returns:
            Returns when the object is available or ClientError not found exception.
        """
        # Make sure retries is at least 1
        retries = max(1, retries)
        try:
            for retry in range(0, retries):
                try:
                    self.__client.head_object(
                        Bucket=self.__bucket_name, Key=object_name
                    )
                    return
                except ClientError as error:
                    if (
                        error.response["ResponseMetadata"]["HTTPStatusCode"] == 404
                        and retry + 1 != retries
                    ):
                        time.sleep(1)
                        continue
                    raise
        except ClientError:
            raise

    def get_object_manifest(self, object_name):
        """ Download and return the object manifest as a json array.

            Args:
                object_name (str): Name of the object.

            Returns:
                Object manifest as a dictionary.
        """
        manifest_name = object_name.strip("/") + ".manifest"
        try:
            resp = self.__client.get_object(
                Bucket=self.__bucket_name, Key=manifest_name
            )
            return json.loads(resp["Body"].read(resp["ContentLength"]))
        except ClientError as error:
            if error.response["ResponseMetadata"]["HTTPStatusCode"] != 404:
                raise
            else:
                try:
                    self.__wait_for_object(manifest_name, 10)
                    resp = self.__client.get_object(
                        Bucket=self.__bucket_name, Key=manifest_name
                    )
                    return json.loads(resp["Body"].read(resp["ContentLength"]))
                except ClientError:
                    raise

    def download_object(self, args, options):
        """ Download a file from the S3 object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.

        Returns:
            On success, the contents of the object are downloaded to file identidfied
            by options.out_file.
        """
        try:
            object_name = "/".join(args)
            # err = self.__client.download_fileobj(Bucket=self.__bucket_name, Key=object_name,
            #                                      Fileobj=fileHandle)
            self.__client.download_file(
                self.__bucket_name, object_name, options.out_file
            )

        except ClientError:
            raise
        except Exception:
            raise

    def list_objects(self, args, options, max_items=None, strict_path_filter=True):
        """ Return a list of objects based on the provided prefix.

        Used to generate a directory listing based on a prefix path
        constructed from the object parts in args[] and the options.prefix.

        Args:
            args (list): List of prefix keyParts
            options (dic): Dictionary of configuration options
            max_items (int): Maximum number of objects in a list query. Default is
                unlimited but when testing for an empty directory a smaller
                number can be used to limit the query and size of the list that
                is returned.
            strict_path_filter (bool): Defaults to True in order to only return
                objects with the exact same prefix. Individual segment listings
                set this to False in order to get a match on the segment offset.

        Returns:
            On success a list of unique items to be used as a directory listing.
        """
        object_set = set()
        prefix = "/".join(args)
        if options["prefix"] is not None:
            prefix = prefix + "/" + options["prefix"]
            if prefix.endswith("/"):
                prefix = prefix[:-1]

        try:
            # The list_objects API is limited to 1,000 objects so we need to
            # use a paginator. Note: Page size is just a "hint" to the API.
            # The API may choose to return more or fewer items per page.
            page_size = MAX_S3_PAGE_SIZE
            if max_items is not None and max_items < MAX_S3_PAGE_SIZE:
                page_size = max_items
            list_paginator = self.__client.get_paginator("list_objects")
            if not args:
                page_iterator = list_paginator.paginate(
                    Bucket=self.__bucket_name,  # Prefix='',
                    Delimiter="/",
                    PaginationConfig={"PageSize": page_size},
                )
            else:
                page_iterator = list_paginator.paginate(
                    Bucket=self.__bucket_name,
                    Prefix=prefix,
                    PaginationConfig={"PageSize": page_size},
                )
            for objects in page_iterator:
                if (
                    "Contents" in objects
                    and objects.get("KeyCount", len(objects["Contents"])) > 0
                ):
                    split_token = prefix + "/"
                    for item in objects["Contents"]:
                        # Short circuit processing if we need fewer items.
                        if max_items is not None and len(object_set) >= max_items:
                            return list(object_set)

                        path, object_name = os.path.split(item["Key"].rstrip("/"))

                        # If this a S3 backend that does not support empty directory
                        # objects we need to hide the "hidden" file and return the
                        # directory that it is in.
                        if object_name == "x.hidden":
                            root_path, sub_dir = os.path.split(path)
                            if (
                                len(sub_dir) > 0
                                and root_path != ""
                                and root_path == prefix
                            ):
                                object_set.add(sub_dir)
                            continue
                        root_path, _ = os.path.split(prefix)
                        if (
                            (root_path != path and path != prefix)
                            or object_name == ""
                            or item["Key"] == split_token
                        ):
                            continue

                        # Keep this code here for now. I might need to go back to this
                        # detailed parsing. - cjk
                        # key_parts = string.split(item['Key'], split_token)
                        # if key_parts[0] == '.':
                        #     continue
                        #
                        # if split_token == '/' and key_parts[0] != '':
                        #     object_name = key_parts[0]
                        # else:
                        #     if len(key_parts) > 1 and key_parts[0] == '' and key_parts[1] != '':
                        #         sub_part = string.split(key_parts[1], '/')
                        #         object_name = sub_part[0]
                        #     else:
                        #         continue

                        if strict_path_filter == False or path == prefix:
                            # Trim ".manifest" and everything after it if that is in the name.
                            object_set.add(object_name.split(".manifest", 1)[0])

                if "CommonPrefixes" in objects:
                    for object_prefix in objects["CommonPrefixes"]:
                        # Short circuit processing if we need fewer items.
                        if max_items is not None and len(object_set) >= max_items:
                            return list(object_set)
                        # Filter out the segment directory tree.
                        if not object_prefix["Prefix"].startswith(_SEGMENT_PREFIX):
                            object_set.add(object_prefix["Prefix"])

            return list(object_set)

        except ClientError:
            raise

    def list_segments(self, args, options):
        """ Returns a list of object segments based on a prefix.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.

        Returns:
            A list of segments in the object store that match the supplied prefix.
        """
        try:
            segment_list = []
            segments_path, _ = os.path.split(options.prefix)
            object_list = self.list_objects(args, options, strict_path_filter=False)
            for item in object_list:
                segment_list.append(segments_path + "/" + item)

            return segment_list

        except ClientError:
            raise

    def __get_object_headers(self, object_name, retries=1):
        """ Utility method that gets an object head from the repository with retry support.

        The default is to try once. Admittedly, this is ugly because the
        provided object name might be a terminal object or a "directory" so
        we need to try both cases for each retry attempt. When retry > 1, there is
        a 1 second delay between attempts.

        Args:
            object_name (str): Name of the object.
            retries (int): Optional parameter, default is 1 attempt.

        Returns:
            Object head dictionary or Boto3 exception.
        """
        # Prevent retries from being set to less than 1.
        retries = max(1, retries)
        try:
            for retry in range(0, retries):
                try:
                    obj_headers = self.__client.head_object(
                        Bucket=self.__bucket_name, Key=object_name
                    )
                    return obj_headers
                except ClientError as error:
                    if error.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
                        try:
                            obj_headers = self.__client.head_object(
                                Bucket=self.__bucket_name, Key=object_name + "/",
                            )
                            return obj_headers
                        except ClientError as error:
                            if (
                                error.response["ResponseMetadata"]["HTTPStatusCode"]
                                == 404
                            ):
                                if "-segments" not in object_name:
                                    try:
                                        obj_headers = self.__client.head_object(
                                            Bucket=self.__bucket_name,
                                            Key=object_name + ".manifest",
                                        )
                                        return obj_headers
                                    except ClientError as error:
                                        if (
                                            error.response["ResponseMetadata"][
                                                "HTTPStatusCode"
                                            ]
                                            == 404
                                            and retry + 1 != retries
                                        ):
                                            time.sleep(1)
                                            continue
                                        raise
                                if retry + 1 != retries:
                                    time.sleep(1)
                                    continue
                            raise
                    raise
        except ClientError:
            raise

    def stat_object(self, args, options):
        """ Get "operating system like" stat data for the given object.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.

        Returns:
            A stat structure containing object information required by caller.
        """
        stat_data = {"timestamp": 0, "size": 0, "etag": "", "directory": False}
        stat_data["headers"] = {}
        stat_data["headers"]["Metadata"] = {}
        object_name = "/".join(args)
        try:
            if not args:
                stat_data["timestamp"] = _make_timestamp(datetime.datetime.now())
                return stat_data
            else:
                obj_header = self.__get_object_headers(object_name)
                stat_data["timestamp"] = _make_timestamp(obj_header["LastModified"])
                stat_data["headers"] = obj_header
                if (
                    obj_header.get("ContentType", "text/plain")
                    == "application/x-directory"
                    or obj_header["ContentLength"] == 0
                ):
                    stat_data["directory"] = True

                # Copy the Metadata sub values into the stat structure in order to conform
                # to our API.
                for key, value in stat_data["headers"]["Metadata"].items():
                    if "x-object-meta" in key:
                        stat_data["headers"][key] = value

                if (
                    int(
                        stat_data["headers"]["Metadata"].get(
                            "x-object-meta-segment-count", 0
                        )
                    )
                    > 0
                ):
                    stat_data["size"] = stat_data["headers"]["Metadata"].get(
                        "x-object-meta-total-size", 0
                    )
                else:
                    stat_data["size"] = obj_header["ContentLength"]

                stat_data["x-account-bytes-used"] = stat_data["size"]
                return stat_data

        except ClientError:
            raise

    def mkdir_object(self, args, options):
        """ Create an object that represents a directory in the object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        new_folder = "/".join(args)
        self.__create_folder(new_folder, options)
        return

    def rmdir_object(self, args, options):
        """ Remove an object that represents a directory in the object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        object_tree = "/".join(args) + "/"
        # S3 object names cannot start with "/"
        if object_tree.startswith("/"):
            object_tree = object_tree[1:]
        self.__delete_object_tree(object_tree)
        # Now remove the actual directory object.
        self.__client.delete_object(Bucket=self.__bucket_name, Key=object_tree)
        return

    def upload_object_manifest(self, object_name, put_headers, manifest_data):
        """ Upload a new object manifest to the object store.

        Args:
            object_name (str): Name of the object. A ".manifest" will be added.
            put_headers (dic): Dictionary of meta data to be added to the object.
            manifest_data (dic): The manifest data which becomes the body of file.
        """
        try:
            # A workaround to make the S3 manifest behave like the original Swift one.
            for segment_data in manifest_data:
                segment_data["name"] = segment_data.pop("path")
                segment_data["hash"] = segment_data.pop("etag", 0)
            manifest = json.dumps(manifest_data)
            manifest_name = object_name.strip("/") + ".manifest"
            self.__client.put_object(
                Bucket=self.__bucket_name,
                Key=manifest_name,
                Metadata=put_headers,
                Body=manifest,
                ContentType="text/plain",
            )
        except ClientError:
            raise
        return

    def __create_folder(self, folder_name, options):
        """ Utility method used to create a object directory structure.

        The entire path is split and each directory level is created as an object
        if it does not exist.

        Args:
            folder_name (str): The entire folder name.
            options (dic): Dictionary of configuration options.
        """
        try:
            self.__client.head_object(
                Bucket=self.__bucket_name, Key=folder_name
            )  # + '/')
        except ClientError as error:
            if error.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
                path_parts = folder_name.split("/")
                new_path = ""
                for part in path_parts:
                    if part == "":
                        break
                    new_path = new_path + part + "/"
                    try:
                        # obj_head =
                        self.__client.head_object(
                            Bucket=self.__bucket_name, Key=new_path
                        )
                    except ClientError as error:
                        if error.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
                            if options["support_empty_dir"] is True:
                                # For S3 backends (Minio) that do not return a directory
                                # if it is empty, we need to actually create an object that
                                # we will keep hidden.
                                self.__client.put_object(
                                    Bucket=self.__bucket_name,
                                    Key=new_path + "x.hidden",
                                    Body="Do Not Remove",
                                )
                            else:
                                self.__client.put_object(
                                    Bucket=self.__bucket_name,
                                    Key=new_path,
                                    Body='TrilioVault directory object',
                                    ContentType="application/x-directory",
                                )
                        continue

    def upload_object(self, args, options):
        """ Upload an object to the S3 object store.

        Args:
            args (list): List of object name parts.
            options (dic): Dictionary of configuration options.
        """
        assert len(args) == 2
        files = args[1:]
        try:
            # First check the path_valid flag to see if this is not the first segment.
            # Note - Added this for performance reasons when I was tuning.  Might be overkill
            # if it becomes problematic.
            if options.path_valid is False:
                self.__create_folder(
                    os.path.dirname(args[0] + "/" + options["object_name"]), options,
                )
            if options.object_name is not None:
                try:
                    with open(files[0], "rb") as data:
                        self.__client.upload_fileobj(
                            data,
                            Bucket=self.__bucket_name,
                            Key=args[0] + "/" + options["object_name"],
                            ExtraArgs={"ContentType": "application/octet-stream"},
                        )
                except Exception as ex:
                    time.sleep(10)
                    with open(files[0], "rb") as data:
                        self.__client.upload_fileobj(
                            data,
                            Bucket=self.__bucket_name,
                            Key=args[0] + "/" + options["object_name"],
                            ExtraArgs={"ContentType": "application/octet-stream"},
                        )
                    # If experimenting with transfer config settings (a.k.a AWS chunking)
                    # include the argument below as the last argument in the upload_fileobj()
                    # call above. - cjk
                    # Config=__transfer_config
        except ClientError:
            raise