Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
google-cloud-sdk / usr / lib / google-cloud-sdk / platform / gsutil / gslib / tests / test_notification.py
Size: Mime:
# -*- coding: utf-8 -*-
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration tests for notification command."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

import re
import time
import uuid

import boto

import gslib.tests.testcase as testcase
from gslib.tests.util import ObjectToURI as suri
from gslib.tests.util import unittest
from gslib.utils.retry_util import Retry


def _LoadNotificationUrl():
  return boto.config.get_value('GSUtil', 'test_notification_url')


NOTIFICATION_URL = _LoadNotificationUrl()


class TestNotification(testcase.GsUtilIntegrationTestCase):
  """Integration tests for notification command."""

  @unittest.skipUnless(NOTIFICATION_URL,
                       'Test requires notification URL configuration.')
  def test_watch_bucket(self):
    """Tests creating a notification channel on a bucket."""
    bucket_uri = self.CreateBucket()
    self.RunGsUtil(
        ['notification', 'watchbucket', NOTIFICATION_URL,
         suri(bucket_uri)])

    identifier = str(uuid.uuid4())
    token = str(uuid.uuid4())
    stderr = self.RunGsUtil([
        'notification', 'watchbucket', '-i', identifier, '-t', token,
        NOTIFICATION_URL,
        suri(bucket_uri)
    ],
                            return_stderr=True)
    self.assertIn('token: %s' % token, stderr)
    self.assertIn('identifier: %s' % identifier, stderr)

  @unittest.skipUnless(NOTIFICATION_URL,
                       'Test requires notification URL configuration.')
  def test_stop_channel(self):
    """Tests stopping a notification channel on a bucket."""
    bucket_uri = self.CreateBucket()
    stderr = self.RunGsUtil(
        ['notification', 'watchbucket', NOTIFICATION_URL,
         suri(bucket_uri)],
        return_stderr=True)

    channel_id = re.findall(r'channel identifier: (?P<id>.*)', stderr)
    self.assertEqual(len(channel_id), 1)
    resource_id = re.findall(r'resource identifier: (?P<id>.*)', stderr)
    self.assertEqual(len(resource_id), 1)

    channel_id = channel_id[0]
    resource_id = resource_id[0]

    self.RunGsUtil(['notification', 'stopchannel', channel_id, resource_id])

  @unittest.skipUnless(NOTIFICATION_URL,
                       'Test requires notification URL configuration.')
  def test_list_one_channel(self):
    """Tests listing notification channel on a bucket."""
    # TODO(b/132277269): Re-enable these once the service-side bug is fixed.
    return unittest.skip('Functionality has been disabled due to b/132277269')

    bucket_uri = self.CreateBucket()

    # Set up an OCN (object change notification) on the newly created bucket.
    self.RunGsUtil(
        ['notification', 'watchbucket', NOTIFICATION_URL,
         suri(bucket_uri)],
        return_stderr=False)
    # The OCN listing in the service is eventually consistent. In initial
    # tests, it almost never was ready immediately after calling WatchBucket
    # above, so we A) sleep for a few seconds before the first OCN listing
    # attempt, and B) wrap the OCN listing attempt in retry logic in case
    # it raises a BucketNotFoundException (note that RunGsUtil will raise this
    # as an AssertionError due to the exit status not being 0).
    @Retry(AssertionError, tries=3, timeout_secs=5)
    def _ListObjectChangeNotifications():
      stderr = self.RunGsUtil(['notification', 'list', '-o',
                               suri(bucket_uri)],
                              return_stderr=True)
      return stderr

    time.sleep(5)
    stderr = _ListObjectChangeNotifications()

    channel_id = re.findall(r'Channel identifier: (?P<id>.*)', stderr)
    self.assertEqual(len(channel_id), 1)
    resource_id = re.findall(r'Resource identifier: (?P<id>.*)', stderr)
    self.assertEqual(len(resource_id), 1)
    push_url = re.findall(r'Application URL: (?P<id>.*)', stderr)
    self.assertEqual(len(push_url), 1)
    subscriber_email = re.findall(r'Created by: (?P<id>.*)', stderr)
    self.assertEqual(len(subscriber_email), 1)
    creation_time = re.findall(r'Creation time: (?P<id>.*)', stderr)
    self.assertEqual(len(creation_time), 1)

  def test_invalid_subcommand(self):
    stderr = self.RunGsUtil(['notification', 'foo', 'bar', 'baz'],
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('Invalid subcommand', stderr)