Repository URL to install this package:
|
Version:
1.11.0 ▾
|
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Request/response of PutBucketReplication and GetBucketReplication APIs."""
from __future__ import absolute_import
from abc import ABCMeta
from .commonconfig import DISABLED, BaseRule, check_status
from .xml import Element, SubElement, find, findall, findtext
class Status:
"""Status."""
__metaclass__ = ABCMeta
def __init__(self, status):
check_status(status)
self._status = status
@property
def status(self):
"""Get status."""
return self._status
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, cls.__name__)
return cls(findtext(element, "Status", True))
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, self.__class__.__name__)
SubElement(element, "Status", self._status)
return element
class SseKmsEncryptedObjects(Status):
"""SSE KMS encrypted objects."""
class SourceSelectionCriteria:
"""Source selection criteria."""
def __init__(self, sse_kms_encrypted_objects=None):
self._sse_kms_encrypted_objects = sse_kms_encrypted_objects
@property
def sse_kms_encrypted_objects(self):
"""Get SSE KMS encrypted objects."""
return self._sse_kms_encrypted_objects
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, "SourceSelectionCriteria")
return cls(
None if find(element, "SseKmsEncryptedObjects") is None
else SseKmsEncryptedObjects.fromxml(element)
)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "SourceSelectionCriteria")
if self._sse_kms_encrypted_objects:
self._sse_kms_encrypted_objects.toxml(element)
return element
class ExistingObjectReplication(Status):
"""Existing object replication."""
class DeleteMarkerReplication(Status):
"""Delete marker replication."""
def __init__(self, status=DISABLED):
super().__init__(status)
class ReplicationTimeValue:
"""Replication time value."""
__metaclass__ = ABCMeta
def __init__(self, minutes=15):
self._minutes = minutes
@property
def minutes(self):
"""Get minutes."""
return self._minutes
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, cls.__name__)
minutes = findtext(element, "Minutes")
if minutes is not None:
minutes = int(minutes)
return cls(minutes)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, self.__class__.__name__)
if self._minutes is not None:
SubElement(element, "Minutes", str(self._minutes))
return element
class Time(ReplicationTimeValue):
"""Time."""
class ReplicationTime:
"""Replication time."""
def __init__(self, time, status):
if not time:
raise ValueError("time must be provided")
check_status(status)
self._time = time
self._status = status
@property
def time(self):
"""Get time value."""
return self._time
@property
def status(self):
"""Get status."""
return self._status
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, "ReplicationTime")
time = Time.fromxml(element)
status = findtext(element, "Status", True)
return cls(time, status)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "ReplicationTime")
self._time.toxml(element)
SubElement(element, "Status", self._status)
return element
class EventThreshold(ReplicationTimeValue):
"""Event threshold."""
class Metrics:
"""Metrics."""
def __init__(self, event_threshold, status):
if not event_threshold:
raise ValueError("event threshold must be provided")
check_status(status)
self._event_threshold = event_threshold
self._status = status
@property
def event_threshold(self):
"""Get event threshold."""
return self._event_threshold
@property
def status(self):
"""Get status."""
return self._status
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, "Metrics")
event_threshold = EventThreshold.fromxml(element)
status = findtext(element, "Status", True)
return cls(event_threshold, status)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "Metrics")
self._event_threshold.toxml(element)
SubElement(element, "Status", self._status)
return element
class EncryptionConfig:
"""Encryption configuration."""
def __init__(self, replica_kms_key_id=None):
self._replica_kms_key_id = replica_kms_key_id
@property
def replica_kms_key_id(self):
"""Get replica KMS key ID."""
return self._replica_kms_key_id
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, "EncryptionConfiguration")
return cls(findtext(element, "ReplicaKmsKeyID"))
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "EncryptionConfiguration")
SubElement(element, "ReplicaKmsKeyID", self._replica_kms_key_id)
return element
class AccessControlTranslation:
"""Access control translation."""
def __init__(self, owner="Destination"):
if not owner:
raise ValueError("owner must be provided")
self._owner = owner
@property
def owner(self):
"""Get owner."""
return self._owner
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, "AccessControlTranslation")
return cls(findtext(element, "Owner"))
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "AccessControlTranslation")
SubElement(element, "Owner", self._owner)
return element
class Destination:
"""Replication destination."""
def __init__(self, bucket_arn,
access_control_translation=None, account=None,
encryption_config=None, metrics=None,
replication_time=None, storage_class=None):
if not bucket_arn:
raise ValueError("bucket ARN must be provided")
self._bucket_arn = bucket_arn
self._access_control_translation = access_control_translation
self._account = account
self._encryption_config = encryption_config
self._metrics = metrics
self._replication_time = replication_time
self._storage_class = storage_class
@property
def bucket_arn(self):
"""Get bucket ARN."""
return self._bucket_arn
@property
def access_control_translation(self):
"""Get access control translation. """
return self._access_control_translation
@property
def account(self):
"""Get account."""
return self._account
@property
def encryption_config(self):
"""Get encryption configuration."""
return self._encryption_config
@property
def metrics(self):
"""Get metrics."""
return self._metrics
@property
def replication_time(self):
"""Get replication time."""
return self._replication_time
@property
def storage_class(self):
"""Get storage class."""
return self._storage_class
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
element = find(element, "Destination")
access_control_translation = (
None if find(element, "AccessControlTranslation") is None
else AccessControlTranslation.fromxml(element)
)
account = findtext(element, "Account")
bucket_arn = findtext(element, "Bucket", True)
encryption_config = (
None if find(element, "EncryptionConfiguration") is None
else EncryptionConfig.fromxml(element)
)
metrics = (
None if find(element, "Metrics") is None
else Metrics.fromxml(element)
)
replication_time = (
None if find(element, "ReplicationTime") is None
else ReplicationTime.fromxml(element)
)
storage_class = findtext(element, "StorageClass")
return cls(bucket_arn, access_control_translation, account,
encryption_config, metrics, replication_time, storage_class)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "Destination")
if self._access_control_translation:
self._access_control_translation.toxml(element)
if self._account is not None:
SubElement(element, "Account", self._account)
SubElement(element, "Bucket", self._bucket_arn)
if self._encryption_config:
self._encryption_config.toxml(element)
if self._metrics:
self._metrics.toxml(element)
if self._replication_time:
self._replication_time.toxml(element)
if self._storage_class:
SubElement(element, "StorageClass", self._storage_class)
return element
class Rule(BaseRule):
"""Replication rule. """
def __init__(self, destination, status,
delete_marker_replication=None,
existing_object_replication=None,
rule_filter=None, rule_id=None, prefix=None,
priority=None, source_selection_criteria=None):
if not destination:
raise ValueError("destination must be provided")
check_status(status)
super().__init__(rule_filter, rule_id)
self._destination = destination
self._status = status
if rule_filter and not delete_marker_replication:
delete_marker_replication = DeleteMarkerReplication()
self._delete_marker_replication = delete_marker_replication
self._existing_object_replication = existing_object_replication
self._prefix = prefix
self._priority = priority
self._source_selection_criteria = source_selection_criteria
@property
def destination(self):
"""Get destination."""
return self._destination
@property
def status(self):
"""Get status."""
return self._status
@property
def delete_marker_replication(self):
"""Get delete marker replication."""
return self._delete_marker_replication
@property
def existing_object_replication(self):
"""Get existing object replication."""
return self._existing_object_replication
@property
def prefix(self):
"""Get prefix."""
return self._prefix
@property
def priority(self):
"""Get priority."""
return self._priority
@property
def source_selection_criteria(self):
"""Get source selection criteria."""
return self._source_selection_criteria
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
delete_marker_replication = (
None if find(element, "DeleteMarkerReplication") is None
else DeleteMarkerReplication.fromxml(element)
)
destination = Destination.fromxml(element)
existing_object_replication = (
None if find(element, "ExistingObjectReplication") is None
else ExistingObjectReplication.fromxml(element)
)
rule_filter, rule_id = cls.parsexml(element)
prefix = findtext(element, "Prefix")
priority = findtext(element, "Priority")
if priority:
priority = int(priority)
source_selection_criteria = (
None if find(element, "SourceSelectionCriteria") is None
else SourceSelectionCriteria.fromxml(element)
)
status = findtext(element, "Status", True)
return cls(destination, status, delete_marker_replication,
existing_object_replication, rule_filter,
rule_id, prefix, priority, source_selection_criteria)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "Rule")
if self._delete_marker_replication:
self._delete_marker_replication.toxml(element)
self._destination.toxml(element)
if self._existing_object_replication:
self._existing_object_replication.toxml(element)
super().toxml(element)
if self._prefix is not None:
SubElement(element, "Prefix", self._prefix)
if self._priority is not None:
SubElement(element, "Priority", str(self._priority))
if self._source_selection_criteria:
self._source_selection_criteria.toxml(element)
SubElement(element, "Status", self._status)
return element
class ReplicationConfig:
"""Replication configuration."""
def __init__(self, role, rules):
if not rules:
raise ValueError("rules must be provided")
if len(rules) > 1000:
raise ValueError("more than 1000 rules are not supported")
self._role = role
self._rules = rules
@property
def role(self):
"""Get role."""
return self._role
@property
def rules(self):
"""Get rules."""
return self._rules
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
role = findtext(element, "Role", True)
elements = findall(element, "Rule")
rules = []
for tag in elements:
rules.append(Rule.fromxml(tag))
return cls(role, rules)
def toxml(self, element):
"""Convert to XML."""
element = Element("ReplicationConfiguration")
SubElement(element, "Role", self._role)
for rule in self._rules:
rule.toxml(element)
return element