Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ccc-model-manager / lib / python3.9 / site-packages / minio / replicationconfig.py
Size: Mime:
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Request/response of PutBucketReplication and GetBucketReplication APIs."""

from __future__ import absolute_import

from abc import ABCMeta

from .commonconfig import DISABLED, BaseRule, check_status
from .xml import Element, SubElement, find, findall, findtext


class Status:
    """Status."""
    __metaclass__ = ABCMeta

    def __init__(self, status):
        check_status(status)
        self._status = status

    @property
    def status(self):
        """Get status."""
        return self._status

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, cls.__name__)
        return cls(findtext(element, "Status", True))

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, self.__class__.__name__)
        SubElement(element, "Status", self._status)
        return element


class SseKmsEncryptedObjects(Status):
    """SSE KMS encrypted objects."""


class SourceSelectionCriteria:
    """Source selection criteria."""

    def __init__(self, sse_kms_encrypted_objects=None):
        self._sse_kms_encrypted_objects = sse_kms_encrypted_objects

    @property
    def sse_kms_encrypted_objects(self):
        """Get SSE KMS encrypted objects."""
        return self._sse_kms_encrypted_objects

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, "SourceSelectionCriteria")
        return cls(
            None if find(element, "SseKmsEncryptedObjects") is None
            else SseKmsEncryptedObjects.fromxml(element)
        )

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "SourceSelectionCriteria")
        if self._sse_kms_encrypted_objects:
            self._sse_kms_encrypted_objects.toxml(element)
        return element


class ExistingObjectReplication(Status):
    """Existing object replication."""


class DeleteMarkerReplication(Status):
    """Delete marker replication."""

    def __init__(self, status=DISABLED):
        super().__init__(status)


class ReplicationTimeValue:
    """Replication time value."""
    __metaclass__ = ABCMeta

    def __init__(self, minutes=15):
        self._minutes = minutes

    @property
    def minutes(self):
        """Get minutes."""
        return self._minutes

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, cls.__name__)
        minutes = findtext(element, "Minutes")
        if minutes is not None:
            minutes = int(minutes)
        return cls(minutes)

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, self.__class__.__name__)
        if self._minutes is not None:
            SubElement(element, "Minutes", str(self._minutes))
        return element


class Time(ReplicationTimeValue):
    """Time."""


class ReplicationTime:
    """Replication time."""

    def __init__(self, time, status):
        if not time:
            raise ValueError("time must be provided")
        check_status(status)
        self._time = time
        self._status = status

    @property
    def time(self):
        """Get time value."""
        return self._time

    @property
    def status(self):
        """Get status."""
        return self._status

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, "ReplicationTime")
        time = Time.fromxml(element)
        status = findtext(element, "Status", True)
        return cls(time, status)

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "ReplicationTime")
        self._time.toxml(element)
        SubElement(element, "Status", self._status)
        return element


class EventThreshold(ReplicationTimeValue):
    """Event threshold."""


class Metrics:
    """Metrics."""

    def __init__(self, event_threshold, status):
        if not event_threshold:
            raise ValueError("event threshold must be provided")
        check_status(status)
        self._event_threshold = event_threshold
        self._status = status

    @property
    def event_threshold(self):
        """Get event threshold."""
        return self._event_threshold

    @property
    def status(self):
        """Get status."""
        return self._status

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, "Metrics")
        event_threshold = EventThreshold.fromxml(element)
        status = findtext(element, "Status", True)
        return cls(event_threshold, status)

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "Metrics")
        self._event_threshold.toxml(element)
        SubElement(element, "Status", self._status)
        return element


class EncryptionConfig:
    """Encryption configuration."""

    def __init__(self, replica_kms_key_id=None):
        self._replica_kms_key_id = replica_kms_key_id

    @property
    def replica_kms_key_id(self):
        """Get replica KMS key ID."""
        return self._replica_kms_key_id

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, "EncryptionConfiguration")
        return cls(findtext(element, "ReplicaKmsKeyID"))

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "EncryptionConfiguration")
        SubElement(element, "ReplicaKmsKeyID", self._replica_kms_key_id)
        return element


class AccessControlTranslation:
    """Access control translation."""

    def __init__(self, owner="Destination"):
        if not owner:
            raise ValueError("owner must be provided")
        self._owner = owner

    @property
    def owner(self):
        """Get owner."""
        return self._owner

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, "AccessControlTranslation")
        return cls(findtext(element, "Owner"))

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "AccessControlTranslation")
        SubElement(element, "Owner", self._owner)
        return element


class Destination:
    """Replication destination."""

    def __init__(self, bucket_arn,
                 access_control_translation=None, account=None,
                 encryption_config=None, metrics=None,
                 replication_time=None, storage_class=None):
        if not bucket_arn:
            raise ValueError("bucket ARN must be provided")
        self._bucket_arn = bucket_arn
        self._access_control_translation = access_control_translation
        self._account = account
        self._encryption_config = encryption_config
        self._metrics = metrics
        self._replication_time = replication_time
        self._storage_class = storage_class

    @property
    def bucket_arn(self):
        """Get bucket ARN."""
        return self._bucket_arn

    @property
    def access_control_translation(self):
        """Get access control translation. """
        return self._access_control_translation

    @property
    def account(self):
        """Get account."""
        return self._account

    @property
    def encryption_config(self):
        """Get encryption configuration."""
        return self._encryption_config

    @property
    def metrics(self):
        """Get metrics."""
        return self._metrics

    @property
    def replication_time(self):
        """Get replication time."""
        return self._replication_time

    @property
    def storage_class(self):
        """Get storage class."""
        return self._storage_class

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        element = find(element, "Destination")
        access_control_translation = (
            None if find(element, "AccessControlTranslation") is None
            else AccessControlTranslation.fromxml(element)
        )
        account = findtext(element, "Account")
        bucket_arn = findtext(element, "Bucket", True)
        encryption_config = (
            None if find(element, "EncryptionConfiguration") is None
            else EncryptionConfig.fromxml(element)
        )
        metrics = (
            None if find(element, "Metrics") is None
            else Metrics.fromxml(element)
        )
        replication_time = (
            None if find(element, "ReplicationTime") is None
            else ReplicationTime.fromxml(element)
        )
        storage_class = findtext(element, "StorageClass")
        return cls(bucket_arn, access_control_translation, account,
                   encryption_config, metrics, replication_time, storage_class)

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "Destination")
        if self._access_control_translation:
            self._access_control_translation.toxml(element)
        if self._account is not None:
            SubElement(element, "Account", self._account)
        SubElement(element, "Bucket", self._bucket_arn)
        if self._encryption_config:
            self._encryption_config.toxml(element)
        if self._metrics:
            self._metrics.toxml(element)
        if self._replication_time:
            self._replication_time.toxml(element)
        if self._storage_class:
            SubElement(element, "StorageClass", self._storage_class)
        return element


class Rule(BaseRule):
    """Replication rule. """

    def __init__(self, destination, status,
                 delete_marker_replication=None,
                 existing_object_replication=None,
                 rule_filter=None, rule_id=None, prefix=None,
                 priority=None, source_selection_criteria=None):
        if not destination:
            raise ValueError("destination must be provided")

        check_status(status)

        super().__init__(rule_filter, rule_id)

        self._destination = destination
        self._status = status
        if rule_filter and not delete_marker_replication:
            delete_marker_replication = DeleteMarkerReplication()
        self._delete_marker_replication = delete_marker_replication
        self._existing_object_replication = existing_object_replication
        self._prefix = prefix
        self._priority = priority
        self._source_selection_criteria = source_selection_criteria

    @property
    def destination(self):
        """Get destination."""
        return self._destination

    @property
    def status(self):
        """Get status."""
        return self._status

    @property
    def delete_marker_replication(self):
        """Get delete marker replication."""
        return self._delete_marker_replication

    @property
    def existing_object_replication(self):
        """Get existing object replication."""
        return self._existing_object_replication

    @property
    def prefix(self):
        """Get prefix."""
        return self._prefix

    @property
    def priority(self):
        """Get priority."""
        return self._priority

    @property
    def source_selection_criteria(self):
        """Get source selection criteria."""
        return self._source_selection_criteria

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        delete_marker_replication = (
            None if find(element, "DeleteMarkerReplication") is None
            else DeleteMarkerReplication.fromxml(element)
        )
        destination = Destination.fromxml(element)
        existing_object_replication = (
            None if find(element, "ExistingObjectReplication") is None
            else ExistingObjectReplication.fromxml(element)
        )
        rule_filter, rule_id = cls.parsexml(element)
        prefix = findtext(element, "Prefix")
        priority = findtext(element, "Priority")
        if priority:
            priority = int(priority)
        source_selection_criteria = (
            None if find(element, "SourceSelectionCriteria") is None
            else SourceSelectionCriteria.fromxml(element)
        )
        status = findtext(element, "Status", True)

        return cls(destination, status, delete_marker_replication,
                   existing_object_replication, rule_filter,
                   rule_id, prefix, priority, source_selection_criteria)

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "Rule")
        if self._delete_marker_replication:
            self._delete_marker_replication.toxml(element)
        self._destination.toxml(element)
        if self._existing_object_replication:
            self._existing_object_replication.toxml(element)
        super().toxml(element)
        if self._prefix is not None:
            SubElement(element, "Prefix", self._prefix)
        if self._priority is not None:
            SubElement(element, "Priority", str(self._priority))
        if self._source_selection_criteria:
            self._source_selection_criteria.toxml(element)
        SubElement(element, "Status", self._status)
        return element


class ReplicationConfig:
    """Replication configuration."""

    def __init__(self, role, rules):
        if not rules:
            raise ValueError("rules must be provided")
        if len(rules) > 1000:
            raise ValueError("more than 1000 rules are not supported")
        self._role = role
        self._rules = rules

    @property
    def role(self):
        """Get role."""
        return self._role

    @property
    def rules(self):
        """Get rules."""
        return self._rules

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        role = findtext(element, "Role", True)
        elements = findall(element, "Rule")
        rules = []
        for tag in elements:
            rules.append(Rule.fromxml(tag))
        return cls(role, rules)

    def toxml(self, element):
        """Convert to XML."""
        element = Element("ReplicationConfiguration")
        SubElement(element, "Role", self._role)
        for rule in self._rules:
            rule.toxml(element)
        return element