Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
google-cloud-storage / cloud / storage / notification.py
Size: Mime:
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Support for bucket notification resources."""

import re

from google.api_core.exceptions import NotFound


OBJECT_FINALIZE_EVENT_TYPE = 'OBJECT_FINALIZE'
OBJECT_METADATA_UPDATE_EVENT_TYPE = 'OBJECT_METADATA_UPDATE'
OBJECT_DELETE_EVENT_TYPE = 'OBJECT_DELETE'
OBJECT_ARCHIVE_EVENT_TYPE = 'OBJECT_ARCHIVE'

JSON_API_V1_PAYLOAD_FORMAT = 'JSON_API_V1'
NONE_PAYLOAD_FORMAT = 'NONE'

_TOPIC_REF_FMT = '//pubsub.googleapis.com/projects/{}/topics/{}'
_PROJECT_PATTERN = r'(?P<project>[a-z][a-z0-9-]{4,28}[a-z0-9])'
_TOPIC_NAME_PATTERN = r'(?P<name>[A-Za-z](\w|[-_.~+%])+)'
_TOPIC_REF_PATTERN = _TOPIC_REF_FMT.format(
    _PROJECT_PATTERN, _TOPIC_NAME_PATTERN)
_TOPIC_REF_RE = re.compile(_TOPIC_REF_PATTERN)
_BAD_TOPIC = (
    'Resource has invalid topic: {}; see '
    'https://cloud.google.com/storage/docs/json_api/v1/'
    'notifications/insert#topic')


class BucketNotification(object):
    """Represent a single notification resource for a bucket.

    See: https://cloud.google.com/storage/docs/json_api/v1/notifications

    :type bucket: :class:`google.cloud.storage.bucket.Bucket`
    :param bucket: Bucket to which the notification is bound.

    :type topic_name: str
    :param topic_name: Topic name to which notifications are published.

    :type topic_project: str
    :param topic_project:
        (Optional) project ID of topic to which notifications are published.
        If not passed, uses the project ID of the bucket's client.

    :type custom_attributes: dict
    :param custom_attributes:
        (Optional) additional attributes passed with notification events.

    :type event_types: list(str)
    :param event_types:
        (Optional) event types for which notificatin events are published.

    :type blob_name_prefix: str
    :param blob_name_prefix:
        (Optional) prefix of blob names for which notification events are
        published..

    :type payload_format: str
    :param payload_format:
        (Optional) format of payload for notification events.
    """
    def __init__(self, bucket, topic_name,
                 topic_project=None, custom_attributes=None, event_types=None,
                 blob_name_prefix=None, payload_format=NONE_PAYLOAD_FORMAT):
        self._bucket = bucket
        self._topic_name = topic_name

        if topic_project is None:
            topic_project = bucket.client.project

        if topic_project is None:
            raise ValueError(
                "Client project not set:  pass an explicit topic_project.")

        self._topic_project = topic_project

        self._properties = {}

        if custom_attributes is not None:
            self._properties['custom_attributes'] = custom_attributes

        if event_types is not None:
            self._properties['event_types'] = event_types

        if blob_name_prefix is not None:
            self._properties['object_name_prefix'] = blob_name_prefix

        self._properties['payload_format'] = payload_format

    @classmethod
    def from_api_repr(cls, resource, bucket):
        """Construct an instance from the JSON repr returned by the server.

        See: https://cloud.google.com/storage/docs/json_api/v1/notifications

        :type resource: dict
        :param resource: JSON repr of the notification

        :type bucket: :class:`google.cloud.storage.bucket.Bucket`
        :param bucket: Bucket to which the notification is bound.

        :rtype: :class:`BucketNotification`
        :returns: the new notification instance
        """
        topic_path = resource.get('topic')
        if topic_path is None:
            raise ValueError('Resource has no topic')

        name, project = _parse_topic_path(topic_path)
        instance = cls(bucket, name, topic_project=project)
        instance._properties = resource

        return instance

    @property
    def bucket(self):
        """Bucket to which the notification is bound."""
        return self._bucket

    @property
    def topic_name(self):
        """Topic name to which notifications are published."""
        return self._topic_name

    @property
    def topic_project(self):
        """Project ID of topic to which notifications are published.
        """
        return self._topic_project

    @property
    def custom_attributes(self):
        """Custom attributes passed with notification events.
        """
        return self._properties.get('custom_attributes')

    @property
    def event_types(self):
        """Event types for which notification events are published.
        """
        return self._properties.get('event_types')

    @property
    def blob_name_prefix(self):
        """Prefix of blob names for which notification events are published.
        """
        return self._properties.get('object_name_prefix')

    @property
    def payload_format(self):
        """Format of payload of notification events."""
        return self._properties.get('payload_format')

    @property
    def notification_id(self):
        """Server-set ID of notification resource."""
        return self._properties.get('id')

    @property
    def etag(self):
        """Server-set ETag of notification resource."""
        return self._properties.get('etag')

    @property
    def self_link(self):
        """Server-set ETag of notification resource."""
        return self._properties.get('selfLink')

    @property
    def client(self):
        """The client bound to this notfication."""
        return self.bucket.client

    @property
    def path(self):
        """The URL path for this notification."""
        return '/b/{}/notificationConfigs/{}'.format(
            self.bucket.name, self.notification_id)

    def _require_client(self, client):
        """Check client or verify over-ride.

        :type client: :class:`~google.cloud.storage.client.Client` or
                      ``NoneType``
        :param client: the client to use.

        :rtype: :class:`google.cloud.storage.client.Client`
        :returns: The client passed in or the bucket's client.
        """
        if client is None:
            client = self.client
        return client

    def _set_properties(self, response):
        """Helper for :meth:`reload`.

        :type response: dict
        :param response: resource mapping from server
        """
        self._properties.clear()
        self._properties.update(response)

    def create(self, client=None):
        """API wrapper: create the notification.

        See:
        https://cloud.google.com/storage/docs/json_api/v1/notifications/insert

        If :attr:`user_project` is set on the bucket, bills the API request
        to that project.

        :type client: :class:`~google.cloud.storage.client.Client`
        :param client: (Optional) the client to use.  If not passed, falls back
                       to the ``client`` stored on the notification's bucket.
        """
        if self.notification_id is not None:
            raise ValueError("Notification already exists w/ id: {}".format(
                self.notification_id))

        client = self._require_client(client)

        query_params = {}
        if self.bucket.user_project is not None:
            query_params['userProject'] = self.bucket.user_project

        path = '/b/{}/notificationConfigs'.format(self.bucket.name)
        properties = self._properties.copy()
        properties['topic'] = _TOPIC_REF_FMT.format(
            self.topic_project, self.topic_name)
        self._properties = client._connection.api_request(
            method='POST',
            path=path,
            query_params=query_params,
            data=properties,
        )

    def exists(self, client=None):
        """Test whether this notification exists.

        See:
        https://cloud.google.com/storage/docs/json_api/v1/notifications/get

        If :attr:`user_project` is set on the bucket, bills the API request
        to that project.

        :type client: :class:`~google.cloud.storage.client.Client` or
                      ``NoneType``
        :param client: Optional. The client to use.  If not passed, falls back
                       to the ``client`` stored on the current bucket.

        :rtype: bool
        :returns: True, if the notification exists, else False.
        :raises ValueError: if the notification has no ID.
        """
        if self.notification_id is None:
            raise ValueError("Notification not intialized by server")

        client = self._require_client(client)

        query_params = {}
        if self.bucket.user_project is not None:
            query_params['userProject'] = self.bucket.user_project

        try:
            client._connection.api_request(
                method='GET',
                path=self.path,
                query_params=query_params,
            )
        except NotFound:
            return False
        else:
            return True

    def reload(self, client=None):
        """Update this notification from the server configuration.

        See:
        https://cloud.google.com/storage/docs/json_api/v1/notifications/get

        If :attr:`user_project` is set on the bucket, bills the API request
        to that project.

        :type client: :class:`~google.cloud.storage.client.Client` or
                      ``NoneType``
        :param client: Optional. The client to use.  If not passed, falls back
                       to the ``client`` stored on the current bucket.

        :rtype: bool
        :returns: True, if the notification exists, else False.
        :raises ValueError: if the notification has no ID.
        """
        if self.notification_id is None:
            raise ValueError("Notification not intialized by server")

        client = self._require_client(client)

        query_params = {}
        if self.bucket.user_project is not None:
            query_params['userProject'] = self.bucket.user_project

        response = client._connection.api_request(
            method='GET',
            path=self.path,
            query_params=query_params,
        )
        self._set_properties(response)

    def delete(self, client=None):
        """Delete this notification.

        See:
        https://cloud.google.com/storage/docs/json_api/v1/notifications/delete

        If :attr:`user_project` is set on the bucket, bills the API request
        to that project.

        :type client: :class:`~google.cloud.storage.client.Client` or
                      ``NoneType``
        :param client: Optional. The client to use.  If not passed, falls back
                       to the ``client`` stored on the current bucket.

        :raises: :class:`google.api_core.exceptions.NotFound`:
            if the notification does not exist.
        :raises ValueError: if the notification has no ID.
        """
        if self.notification_id is None:
            raise ValueError("Notification not intialized by server")

        client = self._require_client(client)

        query_params = {}
        if self.bucket.user_project is not None:
            query_params['userProject'] = self.bucket.user_project

        client._connection.api_request(
            method='DELETE',
            path=self.path,
            query_params=query_params,
        )


def _parse_topic_path(topic_path):
    """Verify that a topic path is in the correct format.

    .. _resource manager docs: https://cloud.google.com/resource-manager/\
                               reference/rest/v1beta1/projects#\
                               Project.FIELDS.project_id
    .. _topic spec: https://cloud.google.com/storage/docs/json_api/v1/\
                    notifications/insert#topic

    Expected to be of the form:

        //pubsub.googleapis.com/projects/{project}/topics/{topic}

    where the ``project`` value must be "6 to 30 lowercase letters, digits,
    or hyphens. It must start with a letter. Trailing hyphens are prohibited."
    (see `resource manager docs`_) and ``topic`` must have length at least two,
    must start with a letter and may only contain alphanumeric characters or
    ``-``, ``_``, ``.``, ``~``, ``+`` or ``%`` (i.e characters used for URL
    encoding, see `topic spec`_).

    Args:
        topic_path (str): The topic path to be verified.

    Returns:
        Tuple[str, str]: The ``project`` and ``topic`` parsed from the
        ``topic_path``.

    Raises:
        ValueError: If the topic path is invalid.
    """
    match = _TOPIC_REF_RE.match(topic_path)
    if match is None:
        raise ValueError(_BAD_TOPIC.format(topic_path))

    return match.group('name'), match.group('project')