Repository URL to install this package:
|
Version:
297.0.1-0 ▾
|
# -*- coding: utf-8 -*-
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration tests for rm command."""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals
import re
from gslib.exception import NO_URLS_MATCHED_GENERIC
from gslib.exception import NO_URLS_MATCHED_TARGET
import gslib.tests.testcase as testcase
from gslib.tests.testcase.base import MAX_BUCKET_LENGTH
from gslib.tests.testcase.integration_testcase import SkipForS3
import gslib.tests.util as util
from gslib.tests.util import GenerationFromURI as urigen
from gslib.tests.util import ObjectToURI as suri
from gslib.tests.util import SetBotoConfigForTest
from gslib.utils.retry_util import Retry
class TestRm(testcase.GsUtilIntegrationTestCase):
"""Integration tests for rm command."""
def _CleanRmUiOutputBeforeChecking(self, stderr):
"""Excludes everything coming from the UI to avoid assert errors.
Args:
stderr: The cumulative stderr output.
Returns:
The cumulative stderr output without the expected UI output.
"""
ui_output_pattern = '[^\n\r]*objects][^\n\r]*[\n\r]'
final_message_pattern = 'Operation completed over[^\n]*'
ui_spinner_list = ['\\\r', '|\r', '/\r', '-\r']
ui_lines_list = (re.findall(ui_output_pattern, stderr) +
re.findall(final_message_pattern, stderr) +
ui_spinner_list)
for ui_line in ui_lines_list:
stderr = stderr.replace(ui_line, '')
return stderr
def _RunRemoveCommandAndCheck(self,
command_and_args,
objects_to_remove=None,
buckets_to_remove=None,
stdin=None):
"""Tests a remove command in the presence of eventual listing consistency.
Eventual listing consistency means that a remove command may not see all
of the objects to be removed at once. When removing multiple objects
(or buckets via -r), some calls may return no matches and multiple calls
to the rm command may be necessary to reach the desired state. This function
retries the rm command, incrementally tracking what has been removed and
ensuring that the exact set of objects/buckets are removed across all
retried calls.
The caller is responsible for confirming the existence of buckets/objects
prior to calling this function.
Args:
command_and_args: List of strings representing the rm command+args to run.
objects_to_remove: List of object URL strings (optionally including
generation) that should be removed by the command, if any.
buckets_to_remove: List of bucket URL strings that should be removed by
the command, if any.
stdin: String of data to pipe to the process as standard input (for
testing -I option).
"""
bucket_strings = []
for bucket_to_remove in buckets_to_remove or []:
bucket_strings.append('Removing %s/...' % bucket_to_remove)
object_strings = []
for object_to_remove in objects_to_remove or []:
object_strings.append('Removing %s...' % object_to_remove)
expected_stderr_lines = set(object_strings + bucket_strings)
if not self.multiregional_buckets and self.default_provider == 'gs':
stderr = self.RunGsUtil(command_and_args,
return_stderr=True,
expected_status=None,
stdin=stdin)
num_objects = len(object_strings)
# Asserting for operation completion
if '-q' not in command_and_args:
if '-m' in command_and_args:
self.assertIn('[%d/%d objects]' % (num_objects, num_objects), stderr)
else:
self.assertIn('[%d objects]' % num_objects, stderr)
stderr = self._CleanRmUiOutputBeforeChecking(stderr)
stderr_set = set(stderr.splitlines())
if '' in stderr_set:
stderr_set.remove('') # Avoid groups represented by an empty string.
self.assertEqual(stderr_set, expected_stderr_lines)
else:
cumulative_stderr_lines = set()
@Retry(AssertionError, tries=5, timeout_secs=1)
def _RunRmCommandAndCheck():
"""Runs/retries the command updating+checking cumulative output."""
stderr = self.RunGsUtil(command_and_args,
return_stderr=True,
expected_status=None,
stdin=stdin)
stderr = self._CleanRmUiOutputBeforeChecking(stderr)
update_lines = True
# Retry 404's and 409's due to eventual listing consistency, but don't
# add the output to the set.
if (NO_URLS_MATCHED_GENERIC in stderr or
'409 BucketNotEmpty' in stderr or
'409 VersionedBucketNotEmpty' in stderr):
update_lines = False
# For recursive deletes of buckets, it is possible that the bucket is
# deleted before the objects are all present in the listing, in which
# case we will never see all of the expected "Removing object..."
# messages. Since this is still a successful outcome, just return
# successfully.
if '-r' in command_and_args and 'bucket does not exist' in stderr:
for bucket_to_remove in buckets_to_remove:
matching_bucket = re.match(
r'.*404\s+%s\s+bucket does not exist' %
re.escape(bucket_to_remove), stderr)
if matching_bucket:
for line in cumulative_stderr_lines:
if 'Removing %s/...' % bucket_to_remove in line:
return
if 'Removing %s/...' % bucket_to_remove in stderr:
return
if update_lines:
cumulative_stderr_lines.update(
set([s for s in stderr.splitlines() if s]))
# Ensure all of the expected strings are present.
self.assertEqual(cumulative_stderr_lines, expected_stderr_lines)
_RunRmCommandAndCheck()
def test_all_versions_current(self):
"""Test that 'rm -a' for an object with a current version works."""
bucket_uri = self.CreateVersionedBucket()
key_uri = bucket_uri.clone_replace_name('foo')
key_uri.set_contents_from_string('bar')
g1 = urigen(key_uri)
key_uri.set_contents_from_string('baz')
g2 = urigen(key_uri)
def _Check1(stderr_lines):
stderr = self.RunGsUtil(
['-m', 'rm', '-a', suri(key_uri)], return_stderr=True)
stderr_lines.update(set(stderr.splitlines()))
stderr = '\n'.join(stderr_lines)
self.assertEqual(stderr.count('Removing %s://' % self.default_provider),
2)
self.assertIn('Removing %s#%s...' % (suri(key_uri), g1), stderr)
self.assertIn('Removing %s#%s...' % (suri(key_uri), g2), stderr)
all_stderr_lines = set()
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True)
@Retry(AssertionError, tries=3, timeout_secs=1)
# Use @Retry as hedge against bucket listing eventual consistency.
def _CheckWithRetries(stderr_lines):
_Check1(stderr_lines)
_CheckWithRetries(all_stderr_lines)
else:
_Check1(all_stderr_lines)
self.AssertNObjectsInBucket(bucket_uri, 0, versioned=True)
def test_all_versions_no_current(self):
"""Test that 'rm -a' for an object without a current version works."""
bucket_uri = self.CreateVersionedBucket()
key_uri = bucket_uri.clone_replace_name('foo')
key_uri.set_contents_from_string('bar')
g1 = urigen(key_uri)
key_uri.set_contents_from_string('baz')
g2 = urigen(key_uri)
self._RunRemoveCommandAndCheck(
['-m', 'rm', '-a', suri(key_uri)],
objects_to_remove=[
'%s#%s' % (suri(key_uri), g1),
'%s#%s' % (suri(key_uri), g2)
])
self.AssertNObjectsInBucket(bucket_uri, 0, versioned=True)
def test_fails_for_missing_obj(self):
bucket_uri = self.CreateVersionedBucket()
stderr = self.RunGsUtil(
['rm', '-a', '%s' % suri(bucket_uri, 'foo')],
return_stderr=True,
expected_status=1)
self.assertIn(NO_URLS_MATCHED_TARGET % suri(bucket_uri, 'foo'), stderr)
def test_remove_recursive_prefix(self):
bucket_uri = self.CreateBucket()
obj_uri = self.CreateObject(bucket_uri=bucket_uri,
object_name='a/b/c',
contents=b'foo')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 1)
stderr = self.RunGsUtil(['rm', '-r', suri(bucket_uri, 'a')],
return_stderr=True)
self.assertIn('Removing %s' % suri(obj_uri), stderr)
def test_remove_all_versions_recursive_on_bucket(self):
"""Test that 'rm -r' works on bucket."""
bucket_uri = self.CreateVersionedBucket()
k1_uri = bucket_uri.clone_replace_name('foo')
k2_uri = bucket_uri.clone_replace_name('foo2')
k1_uri.set_contents_from_string('bar')
k2_uri.set_contents_from_string('bar2')
k1g1 = urigen(k1_uri)
k2g1 = urigen(k2_uri)
k1_uri.set_contents_from_string('baz')
k2_uri.set_contents_from_string('baz2')
k1g2 = urigen(k1_uri)
k2g2 = urigen(k2_uri)
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 4, versioned=True)
self._RunRemoveCommandAndCheck(['rm', '-r', suri(bucket_uri)],
objects_to_remove=[
'%s#%s' % (suri(k1_uri), k1g1),
'%s#%s' % (suri(k1_uri), k1g2),
'%s#%s' % (suri(k2_uri), k2g1),
'%s#%s' % (suri(k2_uri), k2g2)
],
buckets_to_remove=[suri(bucket_uri)])
# Use @Retry as hedge against bucket listing eventual consistency.
@Retry(AssertionError, tries=3, timeout_secs=1)
def _Check():
# Bucket should no longer exist.
stderr = self.RunGsUtil(['ls', '-a', suri(bucket_uri)],
return_stderr=True,
expected_status=1)
self.assertIn('bucket does not exist', stderr)
_Check()
def test_remove_all_versions_recursive_on_subdir(self):
"""Test that 'rm -r' works on subdir."""
bucket_uri = self.CreateVersionedBucket()
k1_uri = bucket_uri.clone_replace_name('dir/foo')
k2_uri = bucket_uri.clone_replace_name('dir/foo2')
k1_uri.set_contents_from_string('bar')
k2_uri.set_contents_from_string('bar2')
k1g1 = urigen(k1_uri)
k2g1 = urigen(k2_uri)
k1_uri.set_contents_from_string('baz')
k2_uri.set_contents_from_string('baz2')
k1g2 = urigen(k1_uri)
k2g2 = urigen(k2_uri)
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 4, versioned=True)
self._RunRemoveCommandAndCheck(
['rm', '-r', '%s' % suri(bucket_uri, 'dir')],
objects_to_remove=[
'%s#%s' % (suri(k1_uri), k1g1),
'%s#%s' % (suri(k1_uri), k1g2),
'%s#%s' % (suri(k2_uri), k2g1),
'%s#%s' % (suri(k2_uri), k2g2)
])
self.AssertNObjectsInBucket(bucket_uri, 0, versioned=True)
def test_rm_seek_ahead(self):
object_uri = self.CreateObject(contents=b'foo')
with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'),
('GSUtil', 'task_estimation_force', 'True')]):
stderr = self.RunGsUtil(['-m', 'rm', suri(object_uri)],
return_stderr=True)
self.assertIn('Estimated work for this command: objects: 1\n', stderr)
def test_rm_seek_ahead_stdin_args(self):
object_uri = self.CreateObject(contents=b'foo')
with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'),
('GSUtil', 'task_estimation_force', 'True')]):
stderr = self.RunGsUtil(['-m', 'rm', '-I'],
stdin=suri(object_uri),
return_stderr=True)
self.assertNotIn('Estimated work', stderr)
def test_missing_first_force(self):
bucket_uri = self.CreateBucket()
object_uri = self.CreateObject(bucket_uri=bucket_uri,
object_name='present',
contents=b'foo')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 1)
self.RunGsUtil(
['rm', '%s' % suri(bucket_uri, 'missing'),
suri(object_uri)],
expected_status=1)
stderr = self.RunGsUtil(
['rm', '-f',
'%s' % suri(bucket_uri, 'missing'),
suri(object_uri)],
return_stderr=True,
expected_status=1)
self.assertEqual(stderr.count('Removing %s://' % self.default_provider), 1)
self.RunGsUtil(['stat', suri(object_uri)], expected_status=1)
def test_some_missing(self):
"""Test that 'rm -a' fails when some but not all uris don't exist."""
bucket_uri = self.CreateVersionedBucket()
key_uri = bucket_uri.clone_replace_name('foo')
key_uri.set_contents_from_string('bar')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 1, versioned=True)
stderr = self.RunGsUtil(
['rm', '-a',
suri(key_uri),
'%s' % suri(bucket_uri, 'missing')],
return_stderr=True,
expected_status=1)
self.assertEqual(stderr.count('Removing %s://' % self.default_provider), 1)
self.assertIn(NO_URLS_MATCHED_TARGET % suri(bucket_uri, 'missing'), stderr)
def test_some_missing_force(self):
"""Test that 'rm -af' succeeds despite hidden first uri."""
bucket_uri = self.CreateVersionedBucket()
key_uri = bucket_uri.clone_replace_name('foo')
key_uri.set_contents_from_string('bar')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 1, versioned=True)
stderr = self.RunGsUtil(
['rm', '-af',
suri(key_uri),
'%s' % suri(bucket_uri, 'missing')],
return_stderr=True,
expected_status=1)
self.assertEqual(stderr.count('Removing %s://' % self.default_provider), 1)
self.AssertNObjectsInBucket(bucket_uri, 0)
def test_folder_objects_deleted(self):
"""Test for 'rm -r' of a folder with a dir_$folder$ marker."""
bucket_uri = self.CreateVersionedBucket()
key_uri = bucket_uri.clone_replace_name('abc/o1')
key_uri.set_contents_from_string('foobar')
folder_uri = bucket_uri.clone_replace_name('abc_$folder$')
folder_uri.set_contents_from_string('')
def _RemoveAndCheck():
self.RunGsUtil(['rm', '-r', '%s' % suri(bucket_uri, 'abc')],
expected_status=None)
self.AssertNObjectsInBucket(bucket_uri, 0, versioned=True)
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True)
# This could fail due to eventual listing consistency, so use retry and
# expected_status=None to guard against No URLs matched exceptions.
@Retry(AssertionError, tries=3, timeout_secs=1)
def _RemoveAndCheckWrapper():
_RemoveAndCheck()
_RemoveAndCheckWrapper()
else:
_RemoveAndCheck()
# Bucket should not be deleted (Should not get ServiceException).
bucket_uri.get_location(validate=False)
def test_folder_objects_deleted_with_wildcard(self):
"""Test for 'rm -r' of a folder with a dir_$folder$ marker."""
bucket_uri = self.CreateVersionedBucket()
key_uri = bucket_uri.clone_replace_name('abc/o1')
key_uri.set_contents_from_string('foobar')
folder_uri = bucket_uri.clone_replace_name('abc_$folder$')
folder_uri.set_contents_from_string('')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True)
self._RunRemoveCommandAndCheck(
['rm', '-r', '%s' % suri(bucket_uri, '**')],
objects_to_remove=[
'%s#%s' % (suri(key_uri), urigen(key_uri)),
'%s#%s' % (suri(folder_uri), urigen(folder_uri))
])
self.AssertNObjectsInBucket(bucket_uri, 0, versioned=True)
# Bucket should not be deleted (Should not get ServiceException).
bucket_uri.get_location(validate=False)
@SkipForS3('Listing/removing S3 DeleteMarkers is not supported')
def test_recursive_bucket_rm(self):
"""Test for 'rm -r' of a bucket."""
bucket_uri = self.CreateBucket()
object_uri = self.CreateObject(bucket_uri, contents=b'foo')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 1)
self._RunRemoveCommandAndCheck(
['rm', '-r', suri(bucket_uri)],
objects_to_remove=['%s#%s' % (suri(object_uri), urigen(object_uri))],
buckets_to_remove=[suri(bucket_uri)])
# Use @Retry as hedge against bucket listing eventual consistency.
@Retry(AssertionError, tries=3, timeout_secs=1)
def _Check1():
# Bucket should be deleted.
stderr = self.RunGsUtil(['ls', '-Lb', suri(bucket_uri)],
return_stderr=True,
expected_status=1)
self.assertIn('bucket does not exist', stderr)
_Check1()
# Now try same thing, but for a versioned bucket with multiple versions of
# an object present.
bucket_uri = self.CreateVersionedBucket()
self.CreateObject(bucket_uri, 'obj', contents=b'z')
self.CreateObject(bucket_uri, 'obj', contents=b'z')
final_uri = self.CreateObject(bucket_uri, 'obj', contents=b'z')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 3, versioned=True)
self._RunRemoveCommandAndCheck(['rm', suri(bucket_uri, '**')],
objects_to_remove=['%s' % final_uri])
stderr = self.RunGsUtil(['rb', suri(bucket_uri)],
return_stderr=True,
expected_status=1)
self.assertIn('Bucket is not empty', stderr)
# Now try with rm -r.
@Retry(AssertionError, tries=3, timeout_secs=1)
def _Check2():
self.RunGsUtil(['rm', '-r', suri(bucket_uri)])
# Bucket should be deleted.
stderr = self.RunGsUtil(['ls', '-Lb', suri(bucket_uri)],
return_stderr=True,
expected_status=1)
self.assertIn('bucket does not exist', stderr)
_Check2()
def test_recursive_bucket_rm_with_wildcarding(self):
"""Tests removing all objects and buckets matching a bucket wildcard."""
buri_base = 'gsutil-test-%s' % self.GetTestMethodName()
buri_base = buri_base[:MAX_BUCKET_LENGTH - 20]
buri_base = '%s-%s' % (buri_base, self.MakeRandomTestString())
buri_base = 'aaa-' + buri_base
buri_base = util.MakeBucketNameValid(buri_base)
buri1 = self.CreateBucket(bucket_name='%s-tbuck1' % buri_base)
buri2 = self.CreateBucket(bucket_name='%s-tbuck2' % buri_base)
buri3 = self.CreateBucket(bucket_name='%s-tb3' % buri_base)
ouri1 = self.CreateObject(bucket_uri=buri1, object_name='o1', contents=b'z')
ouri2 = self.CreateObject(bucket_uri=buri2, object_name='o2', contents=b'z')
self.CreateObject(bucket_uri=buri3, object_name='o3', contents=b'z')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(buri1, 1)
self.AssertNObjectsInBucket(buri2, 1)
self.AssertNObjectsInBucket(buri3, 1)
self._RunRemoveCommandAndCheck(
['rm', '-r',
'%s://%s-tbu*' % (self.default_provider, buri_base)],
objects_to_remove=[
'%s#%s' % (suri(ouri1), urigen(ouri1)),
'%s#%s' % (suri(ouri2), urigen(ouri2))
],
buckets_to_remove=[suri(buri1), suri(buri2)])
self.AssertNObjectsInBucket(buri3, 1)
def test_rm_quiet(self):
"""Test that 'rm -q' outputs no progress indications."""
bucket_uri = self.CreateBucket()
key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=b'foo')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 1)
self._RunRemoveCommandAndCheck(['-q', 'rm', suri(key_uri)], [])
self.AssertNObjectsInBucket(bucket_uri, 0)
@SkipForS3('The boto lib used for S3 does not handle objects '
'starting with slashes if we use V4 signature')
def test_rm_object_with_prefix_slash(self):
"""Tests removing a bucket that has an object starting with slash.
The boto lib used for S3 does not handle objects starting with slashes
if we use V4 signature. Hence we are testing objects with prefix
slashes separately.
"""
bucket_uri = self.CreateVersionedBucket()
ouri1 = self.CreateObject(bucket_uri=bucket_uri,
object_name='/dirwithslash/foo',
contents=b'z')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 1, versioned=True)
self._RunRemoveCommandAndCheck(
['rm', '-r', suri(bucket_uri)],
objects_to_remove=['%s#%s' % (suri(ouri1), urigen(ouri1))],
buckets_to_remove=[suri(bucket_uri)])
def test_rm_object_with_slashes(self):
"""Tests removing a bucket that has objects with slashes."""
bucket_uri = self.CreateVersionedBucket()
ouri1 = self.CreateObject(bucket_uri=bucket_uri,
object_name='h/e/l//lo',
contents=b'z')
ouri2 = self.CreateObject(bucket_uri=bucket_uri,
object_name='dirnoslash/foo',
contents=b'z')
ouri3 = self.CreateObject(bucket_uri=bucket_uri,
object_name='dirnoslash/foo2',
contents=b'z')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 3, versioned=True)
self._RunRemoveCommandAndCheck(['rm', '-r', suri(bucket_uri)],
objects_to_remove=[
'%s#%s' % (suri(ouri1), urigen(ouri1)),
'%s#%s' % (suri(ouri2), urigen(ouri2)),
'%s#%s' % (suri(ouri3), urigen(ouri3))
],
buckets_to_remove=[suri(bucket_uri)])
@SkipForS3('The boto lib used for S3 does not handle objects '
'starting with slashes if we use V4 signature')
def test_slasher_horror_film(self):
"""Tests removing a bucket with objects that are filled with slashes."""
bucket_uri = self.CreateVersionedBucket()
ouri1 = self.CreateObject(bucket_uri=bucket_uri,
object_name='h/e/l//lo',
contents=b'Halloween')
ouri2 = self.CreateObject(bucket_uri=bucket_uri,
object_name='/h/e/l/l/o',
contents=b'A Nightmare on Elm Street')
ouri3 = self.CreateObject(bucket_uri=bucket_uri,
object_name='//h//e/l//l/o',
contents=b'Friday the 13th')
ouri4 = self.CreateObject(bucket_uri=bucket_uri,
object_name='//h//e//l//l//o',
contents=b'I Know What You Did Last Summer')
ouri5 = self.CreateObject(bucket_uri=bucket_uri,
object_name='/',
contents=b'Scream')
ouri6 = self.CreateObject(bucket_uri=bucket_uri,
object_name='//',
contents=b'Child\'s Play')
ouri7 = self.CreateObject(bucket_uri=bucket_uri,
object_name='///',
contents=b'The Prowler')
ouri8 = self.CreateObject(bucket_uri=bucket_uri,
object_name='////',
contents=b'Black Christmas')
ouri9 = self.CreateObject(
bucket_uri=bucket_uri,
object_name='everything/is/better/with/slashes///////',
contents=b'Maniac')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(bucket_uri, 9, versioned=True)
# We add a slash to URIs with a trailing slash,
# because ObjectToURI (suri) removes one trailing slash.
objects_to_remove = [
'%s#%s' % (suri(ouri1), urigen(ouri1)),
'%s#%s' % (suri(ouri2), urigen(ouri2)),
'%s#%s' % (suri(ouri3), urigen(ouri3)),
'%s#%s' % (suri(ouri4), urigen(ouri4)),
'%s#%s' % (suri(ouri5) + '/', urigen(ouri5)),
'%s#%s' % (suri(ouri6) + '/', urigen(ouri6)),
'%s#%s' % (suri(ouri7) + '/', urigen(ouri7)),
'%s#%s' % (suri(ouri8) + '/', urigen(ouri8)),
'%s#%s' % (suri(ouri9) + '/', urigen(ouri9))
]
self._RunRemoveCommandAndCheck(
['-m', 'rm', '-r', suri(bucket_uri)],
objects_to_remove=objects_to_remove,
buckets_to_remove=[suri(bucket_uri)])
@SkipForS3('GCS versioning headers not supported by S3')
def test_rm_failing_precondition(self):
"""Test for '-h x-goog-if-generation-match:value rm' of an object."""
bucket_uri = self.CreateBucket()
object_uri = self.CreateObject(bucket_uri, contents=b'foo')
stderr = self.RunGsUtil(
['-h', 'x-goog-if-generation-match:12345', 'rm',
suri(object_uri)],
return_stderr=True,
expected_status=1)
self.assertRegex(
stderr, r'PreconditionException: 412 (Precondition)?\s*(Failed|None)')
def test_stdin_args(self):
"""Tests rm with the -I option."""
buri1 = self.CreateVersionedBucket()
ouri1 = self.CreateObject(bucket_uri=buri1,
object_name='foo',
contents=b'foocontents')
self.CreateObject(bucket_uri=buri1,
object_name='bar',
contents=b'barcontents')
ouri3 = self.CreateObject(bucket_uri=buri1,
object_name='baz',
contents=b'bazcontents')
buri2 = self.CreateVersionedBucket()
ouri4 = self.CreateObject(bucket_uri=buri2,
object_name='moo',
contents=b'moocontents')
if self.multiregional_buckets:
self.AssertNObjectsInBucket(buri1, 3, versioned=True)
self.AssertNObjectsInBucket(buri2, 1, versioned=True)
objects_to_remove = [
'%s#%s' % (suri(ouri1), urigen(ouri1)),
'%s#%s' % (suri(ouri3), urigen(ouri3)),
'%s#%s' % (suri(ouri4), urigen(ouri4))
]
stdin = '\n'.join(objects_to_remove)
self._RunRemoveCommandAndCheck(['rm', '-I'],
objects_to_remove=objects_to_remove,
stdin=stdin)
self.AssertNObjectsInBucket(buri1, 1, versioned=True)
self.AssertNObjectsInBucket(buri2, 0, versioned=True)
def test_rm_nonexistent_bucket_recursive(self):
stderr = self.RunGsUtil([
'rm', '-rf',
'%s://%s' % (self.default_provider, self.nonexistent_bucket_name)
],
return_stderr=True,
expected_status=1)
self.assertIn('Encountered non-existent bucket', stderr)
def test_rm_multiple_nonexistent_objects(self):
bucket_uri = self.CreateBucket()
nonexistent_object1 = suri(bucket_uri, 'nonexistent1')
nonexistent_object2 = suri(bucket_uri, 'nonexistent1')
stderr = self.RunGsUtil(
['rm', '-rf', nonexistent_object1, nonexistent_object2],
return_stderr=True,
expected_status=1)
self.assertIn('2 files/objects could not be removed.', stderr)