Repository URL to install this package:
|
Version:
1.11.0 ▾
|
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Request/response of PutBucketNotificationConfiguration and
GetBucketNotiicationConfiguration APIs.
"""
from __future__ import absolute_import
from abc import ABCMeta
from .xml import Element, SubElement, find, findall, findtext
class FilterRule:
"""Filter rule."""
__metaclass__ = ABCMeta
def __init__(self, name, value):
self._name = name
self._value = value
@property
def name(self):
"""Get name."""
return self._name
@property
def value(self):
"""Get value."""
return self._value
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
name = findtext(element, "Name")
value = findtext(element, "Value")
return cls(name, value)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "FilterRule")
SubElement(element, "Name", self._name)
SubElement(element, "Value", self._value)
return element
class PrefixFilterRule(FilterRule):
"""Prefix filter rule."""
def __init__(self, value):
super().__init__("prefix", value)
class SuffixFilterRule(FilterRule):
"""Suffix filter rule."""
def __init__(self, value):
super().__init__("suffix", value)
class CommonConfig:
"""Common for cloud-function/queue/topic configuration."""
__metaclass__ = ABCMeta
def __init__(self, events, config_id, prefix_filter_rule,
suffix_filter_rule):
if not events:
raise ValueError("events must be provided")
self._events = events
self._config_id = config_id
self._prefix_filter_rule = prefix_filter_rule
self._suffix_filter_rule = suffix_filter_rule
@property
def events(self):
"""Get events."""
return self._events
@property
def config_id(self):
"""Get configuration ID."""
return self._config_id
@property
def prefix_filter_rule(self):
"""Get prefix filter rule."""
return self._prefix_filter_rule
@property
def suffix_filter_rule(self):
"""Get suffix filter rule."""
return self._suffix_filter_rule
@staticmethod
def parsexml(element):
"""Parse XML."""
elements = findall(element, "Event")
events = [tag.text for tag in elements]
config_id = findtext(element, "Id")
prefix_filter_rule = None
suffix_filter_rule = None
element = find(element, "Filter")
if element is not None:
element = find(element, "S3Key")
elements = findall(element, "FilterRule")
for tag in elements:
filter_rule = FilterRule.fromxml(tag)
if filter_rule.name == "prefix":
prefix_filter_rule = PrefixFilterRule(filter_rule.value)
else:
suffix_filter_rule = SuffixFilterRule(filter_rule.value)
return events, config_id, prefix_filter_rule, suffix_filter_rule
def toxml(self, element):
"""Convert to XML."""
for event in self._events:
SubElement(element, "Event", event)
if self._config_id is not None:
SubElement(element, "Id", self._config_id)
if self._prefix_filter_rule or self._suffix_filter_rule:
rule = SubElement(element, "Filter")
rule = SubElement(rule, "S3Key")
if self._prefix_filter_rule:
self._prefix_filter_rule.toxml(rule)
if self._suffix_filter_rule:
self._suffix_filter_rule.toxml(rule)
return element
class CloudFuncConfig(CommonConfig):
"""Cloud function configuration."""
def __init__(self, cloud_func, events, config_id=None,
prefix_filter_rule=None, suffix_filter_rule=None):
if not cloud_func:
raise ValueError("cloud function must be provided")
self._cloud_func = cloud_func
super().__init__(
events, config_id, prefix_filter_rule, suffix_filter_rule,
)
@property
def cloud_func(self):
"""Get cloud function ARN."""
return self._cloud_func
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
cloud_func = findtext(element, "CloudFunction", True)
(events, config_id, prefix_filter_rule,
suffix_filter_rule) = cls.parsexml(element)
return cls(
cloud_func,
events,
config_id,
prefix_filter_rule,
suffix_filter_rule
)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "CloudFunctionConfiguration")
SubElement(element, "CloudFunction", self._cloud_func)
super().toxml(element)
return element
class QueueConfig(CommonConfig):
"""Queue configuration."""
def __init__(self, queue, events, config_id=None,
prefix_filter_rule=None, suffix_filter_rule=None):
if not queue:
raise ValueError("queue must be provided")
self._queue = queue
super().__init__(
events, config_id, prefix_filter_rule, suffix_filter_rule,
)
@property
def queue(self):
"""Get queue ARN."""
return self._queue
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
queue = findtext(element, "Queue", True)
(events, config_id, prefix_filter_rule,
suffix_filter_rule) = cls.parsexml(element)
return cls(
queue,
events,
config_id,
prefix_filter_rule,
suffix_filter_rule
)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "QueueConfiguration")
SubElement(element, "Queue", self._queue)
super().toxml(element)
return element
class TopicConfig(CommonConfig):
"""Get topic configuration."""
def __init__(self, topic, events, config_id=None,
prefix_filter_rule=None, suffix_filter_rule=None):
if not topic:
raise ValueError("topic must be provided")
self._topic = topic
super().__init__(
events, config_id, prefix_filter_rule, suffix_filter_rule,
)
@property
def topic(self):
"""Get topic ARN."""
return self._topic
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
topic = findtext(element, "Topic", True)
(events, config_id, prefix_filter_rule,
suffix_filter_rule) = cls.parsexml(element)
return cls(
topic,
events,
config_id,
prefix_filter_rule,
suffix_filter_rule
)
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "TopicConfiguration")
SubElement(element, "Topic", self._topic)
super().toxml(element)
return element
class NotificationConfig:
"""Notification configuration."""
def __init__(self, cloud_func_config_list=None, queue_config_list=None,
topic_config_list=None):
self._cloud_func_config_list = cloud_func_config_list or []
self._queue_config_list = queue_config_list or []
self._topic_config_list = topic_config_list or []
@property
def cloud_func_config_list(self):
"""Get cloud function configuration list."""
return self._cloud_func_config_list
@property
def queue_config_list(self):
"""Get queue configuration list."""
return self._queue_config_list
@property
def topic_config_list(self):
"""Get topic configuration list."""
return self._topic_config_list
@classmethod
def fromxml(cls, element):
"""Create new object with values from XML element."""
elements = findall(element, "CloudFunctionConfiguration")
cloud_func_config_list = []
for tag in elements:
cloud_func_config_list.append(CloudFuncConfig.fromxml(tag))
elements = findall(element, "QueueConfiguration")
queue_config_list = []
for tag in elements:
queue_config_list.append(QueueConfig.fromxml(tag))
elements = findall(element, "TopicConfiguration")
topic_config_list = []
for tag in elements:
topic_config_list.append(TopicConfig.fromxml(tag))
return cls(
cloud_func_config_list, queue_config_list, topic_config_list,
)
def toxml(self, element):
"""Convert to XML."""
element = Element("NotificationConfiguration")
for config in self._cloud_func_config_list:
config.toxml(element)
for config in self._queue_config_list:
config.toxml(element)
for config in self._topic_config_list:
config.toxml(element)
return element