Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ccc-model-manager / lib / python3.9 / site-packages / minio / notificationconfig.py
Size: Mime:
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Request/response of PutBucketNotificationConfiguration and
GetBucketNotiicationConfiguration APIs.
"""

from __future__ import absolute_import

from abc import ABCMeta

from .xml import Element, SubElement, find, findall, findtext


class FilterRule:
    """Filter rule."""

    __metaclass__ = ABCMeta

    def __init__(self, name, value):
        self._name = name
        self._value = value

    @property
    def name(self):
        """Get name."""
        return self._name

    @property
    def value(self):
        """Get value."""
        return self._value

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        name = findtext(element, "Name")
        value = findtext(element, "Value")
        return cls(name, value)

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "FilterRule")
        SubElement(element, "Name", self._name)
        SubElement(element, "Value", self._value)
        return element


class PrefixFilterRule(FilterRule):
    """Prefix filter rule."""

    def __init__(self, value):
        super().__init__("prefix", value)


class SuffixFilterRule(FilterRule):
    """Suffix filter rule."""

    def __init__(self, value):
        super().__init__("suffix", value)


class CommonConfig:
    """Common for cloud-function/queue/topic configuration."""

    __metaclass__ = ABCMeta

    def __init__(self, events, config_id, prefix_filter_rule,
                 suffix_filter_rule):
        if not events:
            raise ValueError("events must be provided")
        self._events = events
        self._config_id = config_id
        self._prefix_filter_rule = prefix_filter_rule
        self._suffix_filter_rule = suffix_filter_rule

    @property
    def events(self):
        """Get events."""
        return self._events

    @property
    def config_id(self):
        """Get configuration ID."""
        return self._config_id

    @property
    def prefix_filter_rule(self):
        """Get prefix filter rule."""
        return self._prefix_filter_rule

    @property
    def suffix_filter_rule(self):
        """Get suffix filter rule."""
        return self._suffix_filter_rule

    @staticmethod
    def parsexml(element):
        """Parse XML."""
        elements = findall(element, "Event")
        events = [tag.text for tag in elements]
        config_id = findtext(element, "Id")
        prefix_filter_rule = None
        suffix_filter_rule = None
        element = find(element, "Filter")
        if element is not None:
            element = find(element, "S3Key")
            elements = findall(element, "FilterRule")
            for tag in elements:
                filter_rule = FilterRule.fromxml(tag)
                if filter_rule.name == "prefix":
                    prefix_filter_rule = PrefixFilterRule(filter_rule.value)
                else:
                    suffix_filter_rule = SuffixFilterRule(filter_rule.value)
        return events, config_id, prefix_filter_rule, suffix_filter_rule

    def toxml(self, element):
        """Convert to XML."""
        for event in self._events:
            SubElement(element, "Event", event)
        if self._config_id is not None:
            SubElement(element, "Id", self._config_id)
        if self._prefix_filter_rule or self._suffix_filter_rule:
            rule = SubElement(element, "Filter")
            rule = SubElement(rule, "S3Key")
        if self._prefix_filter_rule:
            self._prefix_filter_rule.toxml(rule)
        if self._suffix_filter_rule:
            self._suffix_filter_rule.toxml(rule)
        return element


class CloudFuncConfig(CommonConfig):
    """Cloud function configuration."""

    def __init__(self, cloud_func, events, config_id=None,
                 prefix_filter_rule=None, suffix_filter_rule=None):
        if not cloud_func:
            raise ValueError("cloud function must be provided")
        self._cloud_func = cloud_func
        super().__init__(
            events, config_id, prefix_filter_rule, suffix_filter_rule,
        )

    @property
    def cloud_func(self):
        """Get cloud function ARN."""
        return self._cloud_func

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        cloud_func = findtext(element, "CloudFunction", True)
        (events, config_id, prefix_filter_rule,
         suffix_filter_rule) = cls.parsexml(element)
        return cls(
            cloud_func,
            events,
            config_id,
            prefix_filter_rule,
            suffix_filter_rule
        )

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "CloudFunctionConfiguration")
        SubElement(element, "CloudFunction", self._cloud_func)
        super().toxml(element)
        return element


class QueueConfig(CommonConfig):
    """Queue configuration."""

    def __init__(self, queue, events, config_id=None,
                 prefix_filter_rule=None, suffix_filter_rule=None):
        if not queue:
            raise ValueError("queue must be provided")
        self._queue = queue
        super().__init__(
            events, config_id, prefix_filter_rule, suffix_filter_rule,
        )

    @property
    def queue(self):
        """Get queue ARN."""
        return self._queue

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        queue = findtext(element, "Queue", True)
        (events, config_id, prefix_filter_rule,
         suffix_filter_rule) = cls.parsexml(element)
        return cls(
            queue,
            events,
            config_id,
            prefix_filter_rule,
            suffix_filter_rule
        )

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "QueueConfiguration")
        SubElement(element, "Queue", self._queue)
        super().toxml(element)
        return element


class TopicConfig(CommonConfig):
    """Get topic configuration."""

    def __init__(self, topic, events, config_id=None,
                 prefix_filter_rule=None, suffix_filter_rule=None):
        if not topic:
            raise ValueError("topic must be provided")
        self._topic = topic
        super().__init__(
            events, config_id, prefix_filter_rule, suffix_filter_rule,
        )

    @property
    def topic(self):
        """Get topic ARN."""
        return self._topic

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        topic = findtext(element, "Topic", True)
        (events, config_id, prefix_filter_rule,
         suffix_filter_rule) = cls.parsexml(element)
        return cls(
            topic,
            events,
            config_id,
            prefix_filter_rule,
            suffix_filter_rule
        )

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "TopicConfiguration")
        SubElement(element, "Topic", self._topic)
        super().toxml(element)
        return element


class NotificationConfig:
    """Notification configuration."""

    def __init__(self, cloud_func_config_list=None, queue_config_list=None,
                 topic_config_list=None):
        self._cloud_func_config_list = cloud_func_config_list or []
        self._queue_config_list = queue_config_list or []
        self._topic_config_list = topic_config_list or []

    @property
    def cloud_func_config_list(self):
        """Get cloud function configuration list."""
        return self._cloud_func_config_list

    @property
    def queue_config_list(self):
        """Get queue configuration list."""
        return self._queue_config_list

    @property
    def topic_config_list(self):
        """Get topic configuration list."""
        return self._topic_config_list

    @classmethod
    def fromxml(cls, element):
        """Create new object with values from XML element."""
        elements = findall(element, "CloudFunctionConfiguration")
        cloud_func_config_list = []
        for tag in elements:
            cloud_func_config_list.append(CloudFuncConfig.fromxml(tag))
        elements = findall(element, "QueueConfiguration")
        queue_config_list = []
        for tag in elements:
            queue_config_list.append(QueueConfig.fromxml(tag))
        elements = findall(element, "TopicConfiguration")
        topic_config_list = []
        for tag in elements:
            topic_config_list.append(TopicConfig.fromxml(tag))
        return cls(
            cloud_func_config_list, queue_config_list, topic_config_list,
        )

    def toxml(self, element):
        """Convert to XML."""
        element = Element("NotificationConfiguration")
        for config in self._cloud_func_config_list:
            config.toxml(element)
        for config in self._queue_config_list:
            config.toxml(element)
        for config in self._topic_config_list:
            config.toxml(element)
        return element