Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pymongo / test / test_legacy_api.py
Size: Mime:
# Copyright 2015-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test various legacy / deprecated API features."""

import itertools
import sys
import threading
import time
import uuid
import warnings

sys.path[0:0] = [""]

from bson.binary import PYTHON_LEGACY, STANDARD
from bson.code import Code
from bson.codec_options import CodecOptions
from bson.objectid import ObjectId
from bson.py3compat import string_type
from bson.son import SON
from pymongo import ASCENDING, DESCENDING, GEOHAYSTACK
from pymongo.database import Database
from pymongo.common import partition_node
from pymongo.errors import (BulkWriteError,
                            ConfigurationError,
                            CursorNotFound,
                            DocumentTooLarge,
                            DuplicateKeyError,
                            InvalidDocument,
                            InvalidOperation,
                            OperationFailure,
                            WriteConcernError,
                            WTimeoutError)
from pymongo.hello_compat import HelloCompat
from pymongo.message import _CursorAddress
from pymongo.operations import IndexModel
from pymongo.son_manipulator import (AutoReference,
                                     NamespaceInjector,
                                     ObjectIdShuffler,
                                     SONManipulator)
from pymongo.write_concern import WriteConcern
from test import client_context, qcheck, unittest, SkipTest
from test.test_client import IntegrationTest
from test.test_bulk import BulkTestBase, BulkAuthorizationTestBase
from test.utils import (DeprecationFilter,
                        joinall,
                        oid_generated_on_process,
                        rs_or_single_client,
                        rs_or_single_client_noauth,
                        single_client,
                        wait_until)


class TestDeprecations(IntegrationTest):

    @classmethod
    def setUpClass(cls):
        super(TestDeprecations, cls).setUpClass()
        cls.deprecation_filter = DeprecationFilter("error")

    @classmethod
    def tearDownClass(cls):
        cls.deprecation_filter.stop()

    def test_save_deprecation(self):
        self.assertRaises(
            DeprecationWarning, lambda: self.db.test.save({}))

    def test_insert_deprecation(self):
        self.assertRaises(
            DeprecationWarning, lambda: self.db.test.insert({}))

    def test_update_deprecation(self):
        self.assertRaises(
            DeprecationWarning, lambda: self.db.test.update({}, {}))

    def test_remove_deprecation(self):
        self.assertRaises(
            DeprecationWarning, lambda: self.db.test.remove({}))

    def test_find_and_modify_deprecation(self):
        self.assertRaises(
            DeprecationWarning,
            lambda: self.db.test.find_and_modify({'i': 5}, {}))

    def test_add_son_manipulator_deprecation(self):
        db = self.client.pymongo_test
        self.assertRaises(DeprecationWarning,
                          lambda: db.add_son_manipulator(AutoReference(db)))

    def test_ensure_index_deprecation(self):
        try:
            self.assertRaises(
                DeprecationWarning,
                lambda: self.db.test.ensure_index('i'))
        finally:
            self.db.test.drop()

    def test_reindex_deprecation(self):
        self.assertRaises(DeprecationWarning, lambda: self.db.test.reindex())

    def test_geoHaystack_deprecation(self):
        self.addCleanup(self.db.test.drop)
        keys = [("pos", GEOHAYSTACK), ("type", ASCENDING)]
        self.assertRaises(
            DeprecationWarning, self.db.test.create_index, keys, bucketSize=1)
        indexes = [IndexModel(keys, bucketSize=1)]
        self.assertRaises(
            DeprecationWarning, self.db.test.create_indexes, indexes)


class TestLegacy(IntegrationTest):

    @classmethod
    def setUpClass(cls):
        super(TestLegacy, cls).setUpClass()
        cls.w = client_context.w
        cls.deprecation_filter = DeprecationFilter()

    @classmethod
    def tearDownClass(cls):
        cls.deprecation_filter.stop()

    def test_insert_find_one(self):
        # Tests legacy insert.
        db = self.db
        db.test.drop()
        self.assertEqual(0, len(list(db.test.find())))
        doc = {"hello": u"world"}
        _id = db.test.insert(doc)
        self.assertEqual(1, len(list(db.test.find())))
        self.assertEqual(doc, db.test.find_one())
        self.assertEqual(doc["_id"], _id)
        self.assertTrue(isinstance(_id, ObjectId))

        doc_class = dict
        # Work around http://bugs.jython.org/issue1728
        if (sys.platform.startswith('java') and
                sys.version_info[:3] >= (2, 5, 2)):
            doc_class = SON

        db = self.client.get_database(
            db.name, codec_options=CodecOptions(document_class=doc_class))

        def remove_insert_find_one(doc):
            db.test.remove({})
            db.test.insert(doc)
            # SON equality is order sensitive.
            return db.test.find_one() == doc.to_dict()

        qcheck.check_unittest(self, remove_insert_find_one,
                              qcheck.gen_mongo_dict(3))

    def test_generator_insert(self):
        # Only legacy insert currently supports insert from a generator.
        db = self.db
        db.test.remove({})
        self.assertEqual(db.test.find().count(), 0)
        db.test.insert(({'a': i} for i in range(5)), manipulate=False)
        self.assertEqual(5, db.test.count())
        db.test.remove({})

        db.test.insert(({'a': i} for i in range(5)), manipulate=True)
        self.assertEqual(5, db.test.count())
        db.test.remove({})

    def test_insert_multiple(self):
        # Tests legacy insert.
        db = self.db
        db.drop_collection("test")
        doc1 = {"hello": u"world"}
        doc2 = {"hello": u"mike"}
        self.assertEqual(db.test.find().count(), 0)
        ids = db.test.insert([doc1, doc2])
        self.assertEqual(db.test.find().count(), 2)
        self.assertEqual(doc1, db.test.find_one({"hello": u"world"}))
        self.assertEqual(doc2, db.test.find_one({"hello": u"mike"}))

        self.assertEqual(2, len(ids))
        self.assertEqual(doc1["_id"], ids[0])
        self.assertEqual(doc2["_id"], ids[1])

        ids = db.test.insert([{"hello": 1}])
        self.assertTrue(isinstance(ids, list))
        self.assertEqual(1, len(ids))

        self.assertRaises(InvalidOperation, db.test.insert, [])

        # Generator that raises StopIteration on first call to next().
        self.assertRaises(InvalidOperation, db.test.insert, (i for i in []))

    def test_insert_multiple_with_duplicate(self):
        # Tests legacy insert.
        db = self.db
        db.drop_collection("test_insert_multiple_with_duplicate")
        collection = db.test_insert_multiple_with_duplicate
        collection.create_index([('i', ASCENDING)], unique=True)

        # No error
        collection.insert([{'i': i} for i in range(5, 10)], w=0)
        wait_until(lambda: 5 == collection.count(), 'insert 5 documents')

        db.drop_collection("test_insert_multiple_with_duplicate")
        collection.create_index([('i', ASCENDING)], unique=True)

        # No error
        collection.insert([{'i': 1}] * 2, w=0)
        wait_until(lambda: 1 == collection.count(), 'insert 1 document')

        self.assertRaises(
            DuplicateKeyError,
            lambda: collection.insert([{'i': 2}] * 2),
        )

        db.drop_collection("test_insert_multiple_with_duplicate")
        db = self.client.get_database(
            db.name, write_concern=WriteConcern(w=0))

        collection = db.test_insert_multiple_with_duplicate
        collection.create_index([('i', ASCENDING)], unique=True)

        # No error.
        collection.insert([{'i': 1}] * 2)
        wait_until(lambda: 1 == collection.count(), 'insert 1 document')

        # Implied acknowledged.
        self.assertRaises(
            DuplicateKeyError,
            lambda: collection.insert([{'i': 2}] * 2, fsync=True),
        )

        # Explicit acknowledged.
        self.assertRaises(
            DuplicateKeyError,
            lambda: collection.insert([{'i': 2}] * 2, w=1))

        db.drop_collection("test_insert_multiple_with_duplicate")

    @client_context.require_replica_set
    def test_insert_prefers_write_errors(self):
        # Tests legacy insert.
        collection = self.db.test_insert_prefers_write_errors
        self.db.drop_collection(collection.name)
        collection.insert_one({'_id': 1})
        large = 's' * 1024 * 1024 * 15
        with self.assertRaises(DuplicateKeyError):
            collection.insert(
                [{'_id': 1, 's': large}, {'_id': 2, 's': large}])
        self.assertEqual(1, collection.count())

        with self.assertRaises(DuplicateKeyError):
            collection.insert(
                [{'_id': 1, 's': large}, {'_id': 2, 's': large}],
                continue_on_error=True)
        self.assertEqual(2, collection.count())
        collection.delete_one({'_id': 2})

        # A writeError followed by a writeConcernError should prefer to raise
        # the writeError.
        with self.assertRaises(DuplicateKeyError):
            collection.insert(
                [{'_id': 1, 's': large}, {'_id': 2, 's': large}],
                continue_on_error=True,
                w=len(client_context.nodes) + 10, wtimeout=1)
        self.assertEqual(2, collection.count())
        collection.delete_many({})

        with self.assertRaises(WriteConcernError):
            collection.insert(
                [{'_id': 1, 's': large}, {'_id': 2, 's': large}],
                continue_on_error=True,
                w=len(client_context.nodes) + 10, wtimeout=1)
        self.assertEqual(2, collection.count())

    def test_insert_iterables(self):
        # Tests legacy insert.
        db = self.db

        self.assertRaises(TypeError, db.test.insert, 4)
        self.assertRaises(TypeError, db.test.insert, None)
        self.assertRaises(TypeError, db.test.insert, True)

        db.drop_collection("test")
        self.assertEqual(db.test.find().count(), 0)
        db.test.insert(({"hello": u"world"}, {"hello": u"world"}))
        self.assertEqual(db.test.find().count(), 2)

        db.drop_collection("test")
        self.assertEqual(db.test.find().count(), 0)
        db.test.insert(map(lambda x: {"hello": "world"},
                           itertools.repeat(None, 10)))
        self.assertEqual(db.test.find().count(), 10)

    def test_insert_manipulate_false(self):
        # Test two aspects of legacy insert with manipulate=False:
        #   1. The return value is None or [None] as appropriate.
        #   2. _id is not set on the passed-in document object.
        collection = self.db.test_insert_manipulate_false
        collection.drop()
        oid = ObjectId()
        doc = {'a': oid}

        try:
            # The return value is None.
            self.assertTrue(collection.insert(doc, manipulate=False) is None)
            # insert() shouldn't set _id on the passed-in document object.
            self.assertEqual({'a': oid}, doc)

            # Bulk insert. The return value is a list of None.
            self.assertEqual([None], collection.insert([{}], manipulate=False))

            docs = [{}, {}]
            ids = collection.insert(docs, manipulate=False)
            self.assertEqual([None, None], ids)
            self.assertEqual([{}, {}], docs)
        finally:
            collection.drop()

    def test_continue_on_error(self):
        # Tests legacy insert.
        db = self.db
        db.drop_collection("test_continue_on_error")
        collection = db.test_continue_on_error
        oid = collection.insert({"one": 1})
        self.assertEqual(1, collection.count())

        docs = []
        docs.append({"_id": oid, "two": 2})  # Duplicate _id.
        docs.append({"three": 3})
        docs.append({"four": 4})
        docs.append({"five": 5})

        with self.assertRaises(DuplicateKeyError):
            collection.insert(docs, manipulate=False)

        self.assertEqual(1, collection.count())

        with self.assertRaises(DuplicateKeyError):
            collection.insert(docs, manipulate=False, continue_on_error=True)

        self.assertEqual(4, collection.count())

        collection.remove({}, w=client_context.w)

        oid = collection.insert({"_id": oid, "one": 1}, w=0)
        wait_until(lambda: 1 == collection.count(), 'insert 1 document')

        docs[0].pop("_id")
        docs[2]["_id"] = oid

        with self.assertRaises(DuplicateKeyError):
            collection.insert(docs, manipulate=False)

        self.assertEqual(3, collection.count())
        collection.insert(docs, manipulate=False, continue_on_error=True, w=0)
        wait_until(lambda: 6 == collection.count(), 'insert 3 documents')

    def test_acknowledged_insert(self):
        # Tests legacy insert.
        db = self.db
        db.drop_collection("test_acknowledged_insert")
        collection = db.test_acknowledged_insert

        a = {"hello": "world"}
        collection.insert(a)
        collection.insert(a, w=0)
        self.assertRaises(OperationFailure,
                          collection.insert, a)

    def test_insert_adds_id(self):
        # Tests legacy insert.
        doc = {"hello": "world"}
        self.db.test.insert(doc)
        self.assertTrue("_id" in doc)

        docs = [{"hello": "world"}, {"hello": "world"}]
        self.db.test.insert(docs)
        for doc in docs:
            self.assertTrue("_id" in doc)

    def test_insert_large_batch(self):
        # Tests legacy insert.
        db = self.client.test_insert_large_batch
        self.addCleanup(self.client.drop_database, 'test_insert_large_batch')
        max_bson_size = self.client.max_bson_size
        # Write commands are limited to 16MB + 16k per batch
        big_string = 'x' * int(max_bson_size / 2)

        # Batch insert that requires 2 batches.
        successful_insert = [{'x': big_string}, {'x': big_string},
                             {'x': big_string}, {'x': big_string}]
        db.collection_0.insert(successful_insert, w=1)
        self.assertEqual(4, db.collection_0.count())

        db.collection_0.drop()

        # Test that inserts fail after first error.
        insert_second_fails = [{'_id': 'id0', 'x': big_string},
                               {'_id': 'id0', 'x': big_string},
                               {'_id': 'id1', 'x': big_string},
                               {'_id': 'id2', 'x': big_string}]

        with self.assertRaises(DuplicateKeyError):
            db.collection_1.insert(insert_second_fails)

        self.assertEqual(1, db.collection_1.count())

        db.collection_1.drop()

        # 2 batches, 2nd insert fails, don't continue on error.
        self.assertTrue(db.collection_2.insert(insert_second_fails, w=0))
        wait_until(lambda: 1 == db.collection_2.count(),
                   'insert 1 document', timeout=60)

        db.collection_2.drop()

        # 2 batches, ids of docs 0 and 1 are dupes, ids of docs 2 and 3 are
        # dupes. Acknowledged, continue on error.
        insert_two_failures = [{'_id': 'id0', 'x': big_string},
                               {'_id': 'id0', 'x': big_string},
                               {'_id': 'id1', 'x': big_string},
                               {'_id': 'id1', 'x': big_string}]

        with self.assertRaises(OperationFailure) as context:
            db.collection_3.insert(insert_two_failures,
                                   continue_on_error=True, w=1)

        self.assertIn('id1', str(context.exception))

        # Only the first and third documents should be inserted.
        self.assertEqual(2, db.collection_3.count())

        db.collection_3.drop()

        # 2 batches, 2 errors, unacknowledged, continue on error.
        db.collection_4.insert(insert_two_failures, continue_on_error=True, w=0)

        # Only the first and third documents are inserted.
        wait_until(lambda: 2 == db.collection_4.count(),
                   'insert 2 documents', timeout=60)

        db.collection_4.drop()

    def test_update(self):
        # Tests legacy update.
        db = self.db
        db.drop_collection("test")

        id1 = db.test.save({"x": 5})
        db.test.update({}, {"$inc": {"x": 1}})
        self.assertEqual(db.test.find_one(id1)["x"], 6)

        id2 = db.test.save({"x": 1})
        db.test.update({"x": 6}, {"$inc": {"x": 1}})
        self.assertEqual(db.test.find_one(id1)["x"], 7)
        self.assertEqual(db.test.find_one(id2)["x"], 1)

    def test_update_manipulate(self):
        # Tests legacy update.
        db = self.db
        db.drop_collection("test")
        db.test.insert({'_id': 1})
        db.test.update({'_id': 1}, {'a': 1}, manipulate=True)
        self.assertEqual(
            {'_id': 1, 'a': 1},
            db.test.find_one())

        class AddField(SONManipulator):
            def transform_incoming(self, son, dummy):
                son['field'] = 'value'
                return son

        db.add_son_manipulator(AddField())
        db.test.update({'_id': 1}, {'a': 2}, manipulate=False)
        self.assertEqual(
            {'_id': 1, 'a': 2},
            db.test.find_one())

        db.test.update({'_id': 1}, {'a': 3}, manipulate=True)
        self.assertEqual(
            {'_id': 1, 'a': 3, 'field': 'value'},
            db.test.find_one())

    def test_update_nmodified(self):
        # Tests legacy update.
        db = self.db
        db.drop_collection("test")
        hello = self.client.admin.command(HelloCompat.LEGACY_CMD)
        used_write_commands = (hello.get("maxWireVersion", 0) > 1)

        db.test.insert({'_id': 1})
        result = db.test.update({'_id': 1}, {'$set': {'x': 1}})
        if used_write_commands:
            self.assertEqual(1, result['nModified'])
        else:
            self.assertFalse('nModified' in result)

        # x is already 1.
        result = db.test.update({'_id': 1}, {'$set': {'x': 1}})
        if used_write_commands:
            self.assertEqual(0, result['nModified'])
        else:
            self.assertFalse('nModified' in result)

    def test_multi_update(self):
        # Tests legacy update.
        db = self.db
        db.drop_collection("test")

        db.test.save({"x": 4, "y": 3})
        db.test.save({"x": 5, "y": 5})
        db.test.save({"x": 4, "y": 4})

        db.test.update({"x": 4}, {"$set": {"y": 5}}, multi=True)

        self.assertEqual(3, db.test.count())
        for doc in db.test.find():
            self.assertEqual(5, doc["y"])

        self.assertEqual(2, db.test.update({"x": 4}, {"$set": {"y": 6}},
                                           multi=True)["n"])

    def test_upsert(self):
        # Tests legacy update.
        db = self.db
        db.drop_collection("test")

        db.test.update({"page": "/"}, {"$inc": {"count": 1}}, upsert=True)
        db.test.update({"page": "/"}, {"$inc": {"count": 1}}, upsert=True)

        self.assertEqual(1, db.test.count())
        self.assertEqual(2, db.test.find_one()["count"])

    def test_acknowledged_update(self):
        # Tests legacy update.
        db = self.db
        db.drop_collection("test_acknowledged_update")
        collection = db.test_acknowledged_update
        collection.create_index("x", unique=True)

        collection.insert({"x": 5})
        _id = collection.insert({"x": 4})

        self.assertEqual(
            None, collection.update({"_id": _id}, {"$inc": {"x": 1}}, w=0))

        self.assertRaises(DuplicateKeyError, collection.update,
                          {"_id": _id}, {"$inc": {"x": 1}})

        self.assertEqual(1, collection.update({"_id": _id},
                                              {"$inc": {"x": 2}})["n"])

        self.assertEqual(0, collection.update({"_id": "foo"},
                                              {"$inc": {"x": 2}})["n"])
        db.drop_collection("test_acknowledged_update")

    def test_update_backward_compat(self):
        # MongoDB versions >= 2.6.0 don't return the updatedExisting field
        # and return upsert _id in an array subdocument. This test should
        # pass regardless of server version or type (mongod/s).
        # Tests legacy update.
        c = self.db.test
        c.drop()
        oid = ObjectId()
        res = c.update({'_id': oid}, {'$set': {'a': 'a'}}, upsert=True)
        self.assertFalse(res.get('updatedExisting'))
        self.assertEqual(oid, res.get('upserted'))

        res = c.update({'_id': oid}, {'$set': {'b': 'b'}})
        self.assertTrue(res.get('updatedExisting'))

    def test_save(self):
        # Tests legacy save.
        self.db.drop_collection("test_save")
        collection = self.db.test_save

        # Save a doc with autogenerated id
        _id = collection.save({"hello": "world"})
        self.assertEqual(collection.find_one()["_id"], _id)
        self.assertTrue(isinstance(_id, ObjectId))

        # Save a doc with explicit id
        collection.save({"_id": "explicit_id", "hello": "bar"})
        doc = collection.find_one({"_id": "explicit_id"})
        self.assertEqual(doc['_id'], 'explicit_id')
        self.assertEqual(doc['hello'], 'bar')

        # Save docs with _id field already present (shouldn't create new docs)
        self.assertEqual(2, collection.count())
        collection.save({'_id': _id, 'hello': 'world'})
        self.assertEqual(2, collection.count())
        collection.save({'_id': 'explicit_id', 'hello': 'baz'})
        self.assertEqual(2, collection.count())
        self.assertEqual(
            'baz',
            collection.find_one({'_id': 'explicit_id'})['hello']
        )

        # Acknowledged mode.
        collection.create_index("hello", unique=True)
        # No exception, even though we duplicate the first doc's "hello" value
        collection.save({'_id': 'explicit_id', 'hello': 'world'}, w=0)

        self.assertRaises(
            DuplicateKeyError,
            collection.save,
            {'_id': 'explicit_id', 'hello': 'world'})
        self.db.drop_collection("test")

    def test_save_with_invalid_key(self):
        if client_context.version.at_least(3, 5, 8):
            raise SkipTest("MongoDB >= 3.5.8 allows dotted fields in updates")
        # Tests legacy save.
        self.db.drop_collection("test")
        self.assertTrue(self.db.test.insert({"hello": "world"}))
        doc = self.db.test.find_one()
        doc['a.b'] = 'c'
        self.assertRaises(OperationFailure, self.db.test.save, doc)

    def test_acknowledged_save(self):
        # Tests legacy save.
        db = self.db
        db.drop_collection("test_acknowledged_save")
        collection = db.test_acknowledged_save
        collection.create_index("hello", unique=True)

        collection.save({"hello": "world"})
        collection.save({"hello": "world"}, w=0)
        self.assertRaises(DuplicateKeyError, collection.save,
                          {"hello": "world"})
        db.drop_collection("test_acknowledged_save")

    def test_save_adds_id(self):
        # Tests legacy save.
        doc = {"hello": "jesse"}
        self.db.test.save(doc)
        self.assertTrue("_id" in doc)

    def test_save_returns_id(self):
        doc = {"hello": "jesse"}
        _id = self.db.test.save(doc)
        self.assertTrue(isinstance(_id, ObjectId))
        self.assertEqual(_id, doc["_id"])
        doc["hi"] = "bernie"
        _id = self.db.test.save(doc)
        self.assertTrue(isinstance(_id, ObjectId))
        self.assertEqual(_id, doc["_id"])

    def test_remove_one(self):
        # Tests legacy remove.
        self.db.test.remove()
        self.assertEqual(0, self.db.test.count())

        self.db.test.insert({"x": 1})
        self.db.test.insert({"y": 1})
        self.db.test.insert({"z": 1})
        self.assertEqual(3, self.db.test.count())

        self.db.test.remove(multi=False)
        self.assertEqual(2, self.db.test.count())
        self.db.test.remove()
        self.assertEqual(0, self.db.test.count())

    def test_remove_all(self):
        # Tests legacy remove.
        self.db.test.remove()
        self.assertEqual(0, self.db.test.count())

        self.db.test.insert({"x": 1})
        self.db.test.insert({"y": 1})
        self.assertEqual(2, self.db.test.count())

        self.db.test.remove()
        self.assertEqual(0, self.db.test.count())

    def test_remove_non_objectid(self):
        # Tests legacy remove.
        db = self.db
        db.drop_collection("test")

        db.test.insert_one({"_id": 5})

        self.assertEqual(1, db.test.count())
        db.test.remove(5)
        self.assertEqual(0, db.test.count())

    def test_write_large_document(self):
        # Tests legacy insert, save, and update.
        max_size = self.db.client.max_bson_size
        half_size = int(max_size / 2)
        self.assertEqual(max_size, 16777216)

        self.assertRaises(OperationFailure, self.db.test.insert,
                          {"foo": "x" * max_size})
        self.assertRaises(OperationFailure, self.db.test.save,
                          {"foo": "x" * max_size})
        self.assertRaises(OperationFailure, self.db.test.insert,
                          [{"x": 1}, {"foo": "x" * max_size}])
        self.db.test.insert([{"foo": "x" * half_size},
                             {"foo": "x" * half_size}])

        self.db.test.insert({"bar": "x"})
        # Use w=0 here to test legacy doc size checking in all server versions
        self.assertRaises(DocumentTooLarge, self.db.test.update,
                          {"bar": "x"}, {"bar": "x" * (max_size - 14)}, w=0)
        # This will pass with OP_UPDATE or the update command.
        self.db.test.update({"bar": "x"}, {"bar": "x" * (max_size - 32)})

    def test_last_error_options(self):
        # Tests legacy write methods.
        self.db.test.save({"x": 1}, w=1, wtimeout=1)
        self.db.test.insert({"x": 1}, w=1, wtimeout=1)
        self.db.test.remove({"x": 1}, w=1, wtimeout=1)
        self.db.test.update({"x": 1}, {"y": 2}, w=1, wtimeout=1)

        if client_context.replica_set_name:
            # client_context.w is the number of hosts in the replica set
            w = client_context.w + 1

            # MongoDB 2.8+ raises error code 100, CannotSatisfyWriteConcern,
            # if w > number of members. Older versions just time out after 1 ms
            # as if they had enough secondaries but some are lagging. They
            # return an error with 'wtimeout': True and no code.
            def wtimeout_err(f, *args, **kwargs):
                try:
                    f(*args, **kwargs)
                except WTimeoutError as exc:
                    self.assertIsNotNone(exc.details)
                except OperationFailure as exc:
                    self.assertIsNotNone(exc.details)
                    self.assertEqual(100, exc.code,
                                     "Unexpected error: %r" % exc)
                else:
                    self.fail("%s should have failed" % f)

            coll = self.db.test
            wtimeout_err(coll.save, {"x": 1}, w=w, wtimeout=1)
            wtimeout_err(coll.insert, {"x": 1}, w=w, wtimeout=1)
            wtimeout_err(coll.update, {"x": 1}, {"y": 2}, w=w, wtimeout=1)
            wtimeout_err(coll.remove, {"x": 1}, w=w, wtimeout=1)

        # can't use fsync and j options together
        self.assertRaises(ConfigurationError, self.db.test.insert,
                          {"_id": 1}, j=True, fsync=True)

    def test_find_and_modify(self):
        c = self.db.test
        c.drop()
        c.insert({'_id': 1, 'i': 1})

        # Test that we raise DuplicateKeyError when appropriate.
        c.ensure_index('i', unique=True)
        self.assertRaises(DuplicateKeyError,
                          c.find_and_modify, query={'i': 1, 'j': 1},
                          update={'$set': {'k': 1}}, upsert=True)
        c.drop_indexes()

        # Test correct findAndModify
        self.assertEqual({'_id': 1, 'i': 1},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}))
        self.assertEqual({'_id': 1, 'i': 3},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                           new=True))

        self.assertEqual({'_id': 1, 'i': 3},
                         c.find_and_modify({'_id': 1}, remove=True))

        self.assertEqual(None, c.find_one({'_id': 1}))

        self.assertEqual(None,
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}))
        self.assertEqual(None, c.find_and_modify({'_id': 1},
                                                 {'$inc': {'i': 1}},
                                                 upsert=True))
        self.assertEqual({'_id': 1, 'i': 2},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                           upsert=True, new=True))

        self.assertEqual({'_id': 1, 'i': 2},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                           fields=['i']))
        self.assertEqual({'_id': 1, 'i': 4},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                           new=True, fields={'i': 1}))

        # Test with full_response=True.
        result = c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                   new=True, upsert=True,
                                   full_response=True,
                                   fields={'i': 1})
        self.assertEqual({'_id': 1, 'i': 5}, result["value"])
        self.assertEqual(True,
                         result["lastErrorObject"]["updatedExisting"])

        result = c.find_and_modify({'_id': 2}, {'$inc': {'i': 1}},
                                   new=True, upsert=True,
                                   full_response=True,
                                   fields={'i': 1})
        self.assertEqual({'_id': 2, 'i': 1}, result["value"])
        self.assertEqual(False,
                         result["lastErrorObject"]["updatedExisting"])

        class ExtendedDict(dict):
            pass

        result = c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                   new=True, fields={'i': 1})
        self.assertFalse(isinstance(result, ExtendedDict))
        c = self.db.get_collection(
            "test", codec_options=CodecOptions(document_class=ExtendedDict))
        result = c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                   new=True, fields={'i': 1})
        self.assertTrue(isinstance(result, ExtendedDict))

    def test_find_and_modify_with_sort(self):
        c = self.db.test
        c.drop()
        for j in range(5):
            c.insert({'j': j, 'i': 0})

        sort = {'j': DESCENDING}
        self.assertEqual(4, c.find_and_modify({},
                                              {'$inc': {'i': 1}},
                                              sort=sort)['j'])
        sort = {'j': ASCENDING}
        self.assertEqual(0, c.find_and_modify({},
                                              {'$inc': {'i': 1}},
                                              sort=sort)['j'])
        sort = [('j', DESCENDING)]
        self.assertEqual(4, c.find_and_modify({},
                                              {'$inc': {'i': 1}},
                                              sort=sort)['j'])
        sort = [('j', ASCENDING)]
        self.assertEqual(0, c.find_and_modify({},
                                              {'$inc': {'i': 1}},
                                              sort=sort)['j'])
        sort = SON([('j', DESCENDING)])
        self.assertEqual(4, c.find_and_modify({},
                                              {'$inc': {'i': 1}},
                                              sort=sort)['j'])
        sort = SON([('j', ASCENDING)])
        self.assertEqual(0, c.find_and_modify({},
                                              {'$inc': {'i': 1}},
                                              sort=sort)['j'])

        try:
            from collections import OrderedDict
            sort = OrderedDict([('j', DESCENDING)])
            self.assertEqual(4, c.find_and_modify({},
                                                  {'$inc': {'i': 1}},
                                                  sort=sort)['j'])
            sort = OrderedDict([('j', ASCENDING)])
            self.assertEqual(0, c.find_and_modify({},
                                                  {'$inc': {'i': 1}},
                                                  sort=sort)['j'])
        except ImportError:
            pass
        # Test that a standard dict with two keys is rejected.
        sort = {'j': DESCENDING, 'foo': DESCENDING}
        self.assertRaises(TypeError, c.find_and_modify,
                          {}, {'$inc': {'i': 1}}, sort=sort)

    def test_find_and_modify_with_manipulator(self):
        class AddCollectionNameManipulator(SONManipulator):
            def will_copy(self):
                return True

            def transform_incoming(self, son, dummy):
                copy = SON(son)
                if 'collection' in copy:
                    del copy['collection']
                return copy

            def transform_outgoing(self, son, collection):
                copy = SON(son)
                copy['collection'] = collection.name
                return copy

        db = self.client.pymongo_test
        db.add_son_manipulator(AddCollectionNameManipulator())

        c = db.test
        c.drop()
        c.insert({'_id': 1, 'i': 1})

        # Test correct findAndModify
        # With manipulators
        self.assertEqual({'_id': 1, 'i': 1, 'collection': 'test'},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                           manipulate=True))
        self.assertEqual({'_id': 1, 'i': 3, 'collection': 'test'},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                           new=True, manipulate=True))
        # With out manipulators
        self.assertEqual({'_id': 1, 'i': 3},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}))
        self.assertEqual({'_id': 1, 'i': 5},
                         c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}},
                                           new=True))

    @client_context.require_version_max(4, 1, 0, -1)
    def test_group(self):
        db = self.db
        db.drop_collection("test")

        self.assertEqual([],
                         db.test.group([], {}, {"count": 0},
                                       "function (obj, prev) { prev.count++; }"
                                      ))

        db.test.insert_many([{"a": 2}, {"b": 5}, {"a": 1}])

        self.assertEqual([{"count": 3}],
                         db.test.group([], {}, {"count": 0},
                                       "function (obj, prev) { prev.count++; }"
                                      ))

        self.assertEqual([{"count": 1}],
                         db.test.group([], {"a": {"$gt": 1}}, {"count": 0},
                                       "function (obj, prev) { prev.count++; }"
                                      ))

        db.test.insert_one({"a": 2, "b": 3})

        self.assertEqual([{"a": 2, "count": 2},
                          {"a": None, "count": 1},
                          {"a": 1, "count": 1}],
                         db.test.group(["a"], {}, {"count": 0},
                                       "function (obj, prev) { prev.count++; }"
                                      ))

        # modifying finalize
        self.assertEqual([{"a": 2, "count": 3},
                          {"a": None, "count": 2},
                          {"a": 1, "count": 2}],
                         db.test.group(["a"], {}, {"count": 0},
                                       "function (obj, prev) "
                                       "{ prev.count++; }",
                                       "function (obj) { obj.count++; }"))

        # returning finalize
        self.assertEqual([2, 1, 1],
                         db.test.group(["a"], {}, {"count": 0},
                                       "function (obj, prev) "
                                       "{ prev.count++; }",
                                       "function (obj) { return obj.count; }"))

        # keyf
        self.assertEqual([2, 2],
                         db.test.group("function (obj) { if (obj.a == 2) "
                                       "{ return {a: true} }; "
                                       "return {b: true}; }", {}, {"count": 0},
                                       "function (obj, prev) "
                                       "{ prev.count++; }",
                                       "function (obj) { return obj.count; }"))

        # no key
        self.assertEqual([{"count": 4}],
                         db.test.group(None, {}, {"count": 0},
                                       "function (obj, prev) { prev.count++; }"
                                      ))

        self.assertRaises(OperationFailure, db.test.group,
                          [], {}, {}, "5 ++ 5")

    @client_context.require_version_max(4, 1, 0, -1)
    def test_group_with_scope(self):
        db = self.db
        db.drop_collection("test")
        db.test.insert_many([{"a": 1}, {"b": 1}])

        reduce_function = "function (obj, prev) { prev.count += inc_value; }"

        self.assertEqual(2, db.test.group([], {}, {"count": 0},
                                          Code(reduce_function,
                                               {"inc_value": 1}))[0]['count'])
        self.assertEqual(4, db.test.group([], {}, {"count": 0},
                                          Code(reduce_function,
                                               {"inc_value": 2}))[0]['count'])

        self.assertEqual(1,
                         db.test.group([], {}, {"count": 0},
                                       Code(reduce_function,
                                            {"inc_value": 0.5}))[0]['count'])

        self.assertEqual(2, db.test.group(
            [], {}, {"count": 0},
            Code(reduce_function, {"inc_value": 1}))[0]['count'])

        self.assertEqual(4, db.test.group(
            [], {}, {"count": 0},
            Code(reduce_function, {"inc_value": 2}))[0]['count'])

        self.assertEqual(1, db.test.group(
            [], {}, {"count": 0},
            Code(reduce_function, {"inc_value": 0.5}))[0]['count'])

    @client_context.require_version_max(4, 1, 0, -1)
    def test_group_uuid_representation(self):
        db = self.db
        coll = db.uuid
        coll.drop()
        uu = uuid.uuid4()
        coll.insert_one({"_id": uu, "a": 2})
        coll.insert_one({"_id": uuid.uuid4(), "a": 1})

        reduce = "function (obj, prev) { prev.count++; }"
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=STANDARD))
        self.assertEqual([],
                         coll.group([], {"_id": uu},
                                    {"count": 0}, reduce))
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY))
        self.assertEqual([{"count": 1}],
                         coll.group([], {"_id": uu},
                                    {"count": 0}, reduce))

    @client_context.require_version_max(5, 0, 99)
    def test_last_status(self):
        # Skip versions like "v5.0.0-alpha0-1768-gfda1dfa" which equals:
        # Version(5, 0, 1, -1)
        if (client_context.version[:2] == (5, 0) and
                client_context.version[-1] == -1):
            self.skipTest('getLastError is not supported')
        # Tests many legacy API elements.
        # We must call getlasterror on same socket as the last operation.
        db = rs_or_single_client(maxPoolSize=1).pymongo_test
        collection = db.test_last_status
        collection.remove({})
        collection.save({"i": 1})

        collection.update({"i": 1}, {"$set": {"i": 2}}, w=0)
        # updatedExisting is always false on mongos after an OP_MSG
        # unacknowledged write.
        if not (client_context.version >= (3, 6) and client_context.is_mongos):
            self.assertTrue(db.last_status()["updatedExisting"])
        wait_until(lambda: collection.find_one({"i": 2}),
                   "found updated w=0 doc")

        collection.update({"i": 1}, {"$set": {"i": 500}}, w=0)
        self.assertFalse(db.last_status()["updatedExisting"])

    def test_auto_ref_and_deref(self):
        # Legacy API.
        db = self.client.pymongo_test
        db.add_son_manipulator(AutoReference(db))
        db.add_son_manipulator(NamespaceInjector())

        db.test.a.remove({})
        db.test.b.remove({})
        db.test.c.remove({})

        a = {"hello": u"world"}
        db.test.a.save(a)

        b = {"test": a}
        db.test.b.save(b)

        c = {"another test": b}
        db.test.c.save(c)

        a["hello"] = "mike"
        db.test.a.save(a)

        self.assertEqual(db.test.a.find_one(), a)
        self.assertEqual(db.test.b.find_one()["test"], a)
        self.assertEqual(db.test.c.find_one()["another test"]["test"], a)
        self.assertEqual(db.test.b.find_one(), b)
        self.assertEqual(db.test.c.find_one()["another test"], b)
        self.assertEqual(db.test.c.find_one(), c)

    def test_auto_ref_and_deref_list(self):
        # Legacy API.
        db = self.client.pymongo_test
        db.add_son_manipulator(AutoReference(db))
        db.add_son_manipulator(NamespaceInjector())

        db.drop_collection("users")
        db.drop_collection("messages")

        message_1 = {"title": "foo"}
        db.messages.save(message_1)
        message_2 = {"title": "bar"}
        db.messages.save(message_2)

        user = {"messages": [message_1, message_2]}
        db.users.save(user)
        db.messages.update(message_1, {"title": "buzz"})

        self.assertEqual("buzz", db.users.find_one()["messages"][0]["title"])
        self.assertEqual("bar", db.users.find_one()["messages"][1]["title"])

    def test_object_to_dict_transformer(self):
        # PYTHON-709: Some users rely on their custom SONManipulators to run
        # before any other checks, so they can insert non-dict objects and
        # have them dictified before the _id is inserted or any other
        # processing.
        # Tests legacy API elements.
        class Thing(object):
            def __init__(self, value):
                self.value = value

        class ThingTransformer(SONManipulator):
            def transform_incoming(self, thing, dummy):
                return {'value': thing.value}

        db = self.client.foo
        db.add_son_manipulator(ThingTransformer())
        t = Thing('value')

        db.test.remove()
        db.test.insert([t])
        out = db.test.find_one()
        self.assertEqual('value', out.get('value'))

    def test_son_manipulator_outgoing(self):
        class Thing(object):
            def __init__(self, value):
                self.value = value

        class ThingTransformer(SONManipulator):
            def transform_outgoing(self, doc, collection):
                # We don't want this applied to the command return
                # value in pymongo.cursor.Cursor.
                if 'value' in doc:
                    return Thing(doc['value'])
                return doc

        db = self.client.foo
        db.add_son_manipulator(ThingTransformer())

        db.test.delete_many({})
        db.test.insert_one({'value': 'value'})
        out = db.test.find_one()
        self.assertTrue(isinstance(out, Thing))
        self.assertEqual('value', out.value)

        out = next(db.test.aggregate([], cursor={}))
        self.assertTrue(isinstance(out, Thing))
        self.assertEqual('value', out.value)

    def test_son_manipulator_inheritance(self):
        # Tests legacy API elements.
        class Thing(object):
            def __init__(self, value):
                self.value = value

        class ThingTransformer(SONManipulator):
            def transform_incoming(self, thing, dummy):
                return {'value': thing.value}

            def transform_outgoing(self, son, dummy):
                return Thing(son['value'])

        class Child(ThingTransformer):
            pass

        db = self.client.foo
        db.add_son_manipulator(Child())
        t = Thing('value')

        db.test.remove()
        db.test.insert([t])
        out = db.test.find_one()
        self.assertTrue(isinstance(out, Thing))
        self.assertEqual('value', out.value)

    def test_disabling_manipulators(self):

        class IncByTwo(SONManipulator):
            def transform_outgoing(self, son, collection):
                if 'foo' in son:
                    son['foo'] += 2
                return son

        db = self.client.pymongo_test
        db.add_son_manipulator(IncByTwo())
        c = db.test
        c.drop()
        c.insert({'foo': 0})
        self.assertEqual(2, c.find_one()['foo'])
        self.assertEqual(0, c.find_one(manipulate=False)['foo'])

        self.assertEqual(2, c.find_one(manipulate=True)['foo'])
        c.drop()

    def test_manipulator_properties(self):
        db = self.client.foo
        self.assertEqual([], db.incoming_manipulators)
        self.assertEqual([], db.incoming_copying_manipulators)
        self.assertEqual([], db.outgoing_manipulators)
        self.assertEqual([], db.outgoing_copying_manipulators)
        db.add_son_manipulator(AutoReference(db))
        db.add_son_manipulator(NamespaceInjector())
        db.add_son_manipulator(ObjectIdShuffler())
        self.assertEqual(1, len(db.incoming_manipulators))
        self.assertEqual(db.incoming_manipulators, ['NamespaceInjector'])
        self.assertEqual(2, len(db.incoming_copying_manipulators))
        for name in db.incoming_copying_manipulators:
            self.assertTrue(name in ('ObjectIdShuffler', 'AutoReference'))
        self.assertEqual([], db.outgoing_manipulators)
        self.assertEqual(['AutoReference'],
                         db.outgoing_copying_manipulators)

    def test_ensure_index(self):
        db = self.db

        self.assertRaises(TypeError, db.test.ensure_index, {"hello": 1})
        self.assertRaises(TypeError,
                          db.test.ensure_index, {"hello": 1}, cache_for='foo')

        db.test.drop_indexes()

        self.assertEqual("goodbye_1",
                         db.test.ensure_index("goodbye"))
        self.assertEqual(None, db.test.ensure_index("goodbye"))

        db.test.drop_indexes()
        self.assertEqual("foo",
                         db.test.ensure_index("goodbye", name="foo"))
        self.assertEqual(None, db.test.ensure_index("goodbye", name="foo"))

        db.test.drop_indexes()
        self.assertEqual("goodbye_1",
                         db.test.ensure_index("goodbye"))
        self.assertEqual(None, db.test.ensure_index("goodbye"))

        db.test.drop_index("goodbye_1")
        self.assertEqual("goodbye_1",
                         db.test.ensure_index("goodbye"))
        self.assertEqual(None, db.test.ensure_index("goodbye"))

        db.drop_collection("test")
        self.assertEqual("goodbye_1",
                         db.test.ensure_index("goodbye"))
        self.assertEqual(None, db.test.ensure_index("goodbye"))

        db.test.drop_index("goodbye_1")
        self.assertEqual("goodbye_1",
                         db.test.ensure_index("goodbye"))
        self.assertEqual(None, db.test.ensure_index("goodbye"))

        db.test.drop_index("goodbye_1")
        self.assertEqual("goodbye_1",
                         db.test.ensure_index("goodbye", cache_for=1))
        time.sleep(1.2)
        self.assertEqual("goodbye_1",
                         db.test.ensure_index("goodbye"))
        # Make sure the expiration time is updated.
        self.assertEqual(None,
                         db.test.ensure_index("goodbye"))

        # Clean up indexes for later tests
        db.test.drop_indexes()

    @client_context.require_version_max(4, 1)  # PYTHON-1734
    def test_ensure_index_threaded(self):
        coll = self.db.threaded_index_creation
        index_docs = []

        class Indexer(threading.Thread):
            def run(self):
                coll.ensure_index('foo0')
                coll.ensure_index('foo1')
                coll.ensure_index('foo2')
                index_docs.append(coll.index_information())

        try:
            threads = []
            for _ in range(10):
                t = Indexer()
                t.setDaemon(True)
                threads.append(t)

            for thread in threads:
                thread.start()

            joinall(threads)

            first = index_docs[0]
            for index_doc in index_docs[1:]:
                self.assertEqual(index_doc, first)
        finally:
            coll.drop()

    def test_ensure_purge_index_threaded(self):
        coll = self.db.threaded_index_creation

        class Indexer(threading.Thread):
            def run(self):
                coll.ensure_index('foo')
                try:
                    coll.drop_index('foo')
                except OperationFailure:
                    # The index may have already been dropped.
                    pass
                coll.ensure_index('foo')
                coll.drop_indexes()
                coll.create_index('foo')

        try:
            threads = []
            for _ in range(10):
                t = Indexer()
                t.setDaemon(True)
                threads.append(t)

            for thread in threads:
                thread.start()

            joinall(threads)

            self.assertTrue('foo_1' in coll.index_information())
        finally:
            coll.drop()

    @client_context.require_version_max(4, 1)  # PYTHON-1734
    def test_ensure_unique_index_threaded(self):
        coll = self.db.test_unique_threaded
        coll.drop()
        coll.insert_many([{'foo': i} for i in range(10000)])

        class Indexer(threading.Thread):
            def run(self):
                try:
                    coll.ensure_index('foo', unique=True)
                    coll.insert_one({'foo': 'bar'})
                    coll.insert_one({'foo': 'bar'})
                except OperationFailure:
                    pass

        threads = []
        for _ in range(10):
            t = Indexer()
            t.setDaemon(True)
            threads.append(t)

        for i in range(10):
            threads[i].start()

        joinall(threads)

        self.assertEqual(10001, coll.count())
        coll.drop()

    def test_kill_cursors_with_cursoraddress(self):
        coll = self.client.pymongo_test.test
        coll.drop()

        coll.insert_many([{'_id': i} for i in range(200)])
        cursor = coll.find().batch_size(1)
        next(cursor)
        self.client.kill_cursors(
            [cursor.cursor_id],
            _CursorAddress(self.client.address, coll.full_name))

        # Prevent killcursors from reaching the server while a getmore is in
        # progress -- the server logs "Assertion: 16089:Cannot kill active
        # cursor."
        time.sleep(2)

        def raises_cursor_not_found():
            try:
                next(cursor)
                return False
            except CursorNotFound:
                return True

        wait_until(raises_cursor_not_found, 'close cursor')

    @client_context.require_version_max(4, 9, 99)  # SERVER-57457
    def test_kill_cursors_with_tuple(self):
        # Some evergreen distros (Debian 7.1) still test against 3.6.5 where
        # OP_KILL_CURSORS does not work.
        if (client_context.is_mongos and client_context.auth_enabled and
                (3, 6, 0) <= client_context.version < (3, 6, 6)):
            raise SkipTest("SERVER-33553 This server version does not support "
                           "OP_KILL_CURSORS")

        coll = self.client.pymongo_test.test
        coll.drop()

        coll.insert_many([{'_id': i} for i in range(200)])
        cursor = coll.find().batch_size(1)
        next(cursor)
        self.client.kill_cursors(
            [cursor.cursor_id],
            self.client.address)

        # Prevent killcursors from reaching the server while a getmore is in
        # progress -- the server logs "Assertion: 16089:Cannot kill active
        # cursor."
        time.sleep(2)

        def raises_cursor_not_found():
            try:
                next(cursor)
                return False
            except CursorNotFound:
                return True

        wait_until(raises_cursor_not_found, 'close cursor')


class TestLegacyBulk(BulkTestBase):

    @classmethod
    def setUpClass(cls):
        super(TestLegacyBulk, cls).setUpClass()
        cls.deprecation_filter = DeprecationFilter()

    @classmethod
    def tearDownClass(cls):
        cls.deprecation_filter.stop()

    def test_empty(self):
        bulk = self.coll.initialize_ordered_bulk_op()
        self.assertRaises(InvalidOperation, bulk.execute)

    def test_find(self):
        # find() requires a selector.
        bulk = self.coll.initialize_ordered_bulk_op()
        self.assertRaises(TypeError, bulk.find)
        self.assertRaises(TypeError, bulk.find, 'foo')
        # No error.
        bulk.find({})

    @client_context.require_version_min(3, 1, 9, -1)
    def test_bypass_document_validation_bulk_op(self):

        # Test insert
        self.coll.insert_one({"z": 0})
        self.db.command(SON([("collMod", "test"),
                             ("validator", {"z": {"$gte": 0}})]))
        bulk = self.coll.initialize_ordered_bulk_op(
            bypass_document_validation=False)
        bulk.insert({"z": -1}) # error
        self.assertRaises(BulkWriteError, bulk.execute)
        self.assertEqual(0, self.coll.count({"z": -1}))

        bulk = self.coll.initialize_ordered_bulk_op(
            bypass_document_validation=True)
        bulk.insert({"z": -1})
        bulk.execute()
        self.assertEqual(1, self.coll.count({"z": -1}))

        self.coll.insert_one({"z": 0})
        self.db.command(SON([("collMod", "test"),
                             ("validator", {"z": {"$gte": 0}})]))
        bulk = self.coll.initialize_unordered_bulk_op(
            bypass_document_validation=False)
        bulk.insert({"z": -1}) # error
        self.assertRaises(BulkWriteError, bulk.execute)
        self.assertEqual(1, self.coll.count({"z": -1}))

        bulk = self.coll.initialize_unordered_bulk_op(
            bypass_document_validation=True)
        bulk.insert({"z": -1})
        bulk.execute()
        self.assertEqual(2, self.coll.count({"z": -1}))
        self.coll.drop()

    def test_insert(self):
        expected = {
            'nMatched': 0,
            'nModified': 0,
            'nUpserted': 0,
            'nInserted': 1,
            'nRemoved': 0,
            'upserted': [],
            'writeErrors': [],
            'writeConcernErrors': []
        }

        bulk = self.coll.initialize_ordered_bulk_op()
        self.assertRaises(TypeError, bulk.insert, 1)

        # find() before insert() is prohibited.
        self.assertRaises(AttributeError, lambda: bulk.find({}).insert({}))

        # We don't allow multiple documents per call.
        self.assertRaises(TypeError, bulk.insert, [{}, {}])
        self.assertRaises(TypeError, bulk.insert, ({} for _ in range(2)))

        bulk.insert({})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(1, self.coll.count())
        doc = self.coll.find_one()
        self.assertTrue(oid_generated_on_process(doc['_id']))

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.insert({})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(2, self.coll.count())

    def test_update(self):

        expected = {
            'nMatched': 2,
            'nModified': 2,
            'nUpserted': 0,
            'nInserted': 0,
            'nRemoved': 0,
            'upserted': [],
            'writeErrors': [],
            'writeConcernErrors': []
        }
        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_ordered_bulk_op()

        # update() requires find() first.
        self.assertRaises(
            AttributeError,
            lambda: bulk.update({'$set': {'x': 1}}))

        self.assertRaises(TypeError, bulk.find({}).update, 1)
        self.assertRaises(ValueError, bulk.find({}).update, {})

        # All fields must be $-operators.
        self.assertRaises(ValueError, bulk.find({}).update, {'foo': 'bar'})
        bulk.find({}).update({'$set': {'foo': 'bar'}})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)
        self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 2)

        # All fields must be $-operators -- validated server-side.
        bulk = self.coll.initialize_ordered_bulk_op()
        updates = SON([('$set', {'x': 1}), ('y', 1)])
        bulk.find({}).update(updates)
        self.assertRaises(BulkWriteError, bulk.execute)

        self.coll.delete_many({})
        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({}).update({'$set': {'bim': 'baz'}})
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 2,
             'nModified': 2,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 0,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

        self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 2)

        self.coll.insert_one({'x': 1})
        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({'x': 1}).update({'$set': {'x': 42}})
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 1,
             'nModified': 1,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 0,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

        self.assertEqual(1, self.coll.find({'x': 42}).count())

        # Second time, x is already 42 so nModified is 0.
        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({'x': 42}).update({'$set': {'x': 42}})
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 1,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 0,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

    def test_update_one(self):

        expected = {
            'nMatched': 1,
            'nModified': 1,
            'nUpserted': 0,
            'nInserted': 0,
            'nRemoved': 0,
            'upserted': [],
            'writeErrors': [],
            'writeConcernErrors': []
        }

        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_ordered_bulk_op()

        # update_one() requires find() first.
        self.assertRaises(
            AttributeError,
            lambda: bulk.update_one({'$set': {'x': 1}}))

        self.assertRaises(TypeError, bulk.find({}).update_one, 1)
        self.assertRaises(ValueError, bulk.find({}).update_one, {})
        self.assertRaises(ValueError, bulk.find({}).update_one, {'foo': 'bar'})
        bulk.find({}).update_one({'$set': {'foo': 'bar'}})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1)

        self.coll.delete_many({})
        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({}).update_one({'$set': {'bim': 'baz'}})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1)

        # All fields must be $-operators -- validated server-side.
        bulk = self.coll.initialize_ordered_bulk_op()
        updates = SON([('$set', {'x': 1}), ('y', 1)])
        bulk.find({}).update_one(updates)
        self.assertRaises(BulkWriteError, bulk.execute)

    def test_replace_one(self):

        expected = {
            'nMatched': 1,
            'nModified': 1,
            'nUpserted': 0,
            'nInserted': 0,
            'nRemoved': 0,
            'upserted': [],
            'writeErrors': [],
            'writeConcernErrors': []
        }

        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_ordered_bulk_op()
        self.assertRaises(TypeError, bulk.find({}).replace_one, 1)
        self.assertRaises(ValueError,
                          bulk.find({}).replace_one, {'$set': {'foo': 'bar'}})
        bulk.find({}).replace_one({'foo': 'bar'})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1)

        self.coll.delete_many({})
        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({}).replace_one({'bim': 'baz'})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1)

    def test_remove(self):
        # Test removing all documents, ordered.
        expected = {
            'nMatched': 0,
            'nModified': 0,
            'nUpserted': 0,
            'nInserted': 0,
            'nRemoved': 2,
            'upserted': [],
            'writeErrors': [],
            'writeConcernErrors': []
        }
        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_ordered_bulk_op()

        # remove() must be preceded by find().
        self.assertRaises(AttributeError, lambda: bulk.remove())
        bulk.find({}).remove()
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(self.coll.count(), 0)

        # Test removing some documents, ordered.
        self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}])

        bulk = self.coll.initialize_ordered_bulk_op()

        bulk.find({'x': 1}).remove()
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 2,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

        self.assertEqual(self.coll.count(), 2)
        self.coll.delete_many({})

        # Test removing all documents, unordered.
        self.coll.insert_many([{}, {}])

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({}).remove()
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 2,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

        # Test removing some documents, unordered.
        self.assertEqual(self.coll.count(), 0)

        self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}])

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({'x': 1}).remove()
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 2,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

        self.assertEqual(self.coll.count(), 2)
        self.coll.delete_many({})

    def test_remove_one(self):

        bulk = self.coll.initialize_ordered_bulk_op()

        # remove_one() must be preceded by find().
        self.assertRaises(AttributeError, lambda: bulk.remove_one())

        # Test removing one document, empty selector.
        # First ordered, then unordered.
        self.coll.insert_many([{}, {}])
        expected = {
            'nMatched': 0,
            'nModified': 0,
            'nUpserted': 0,
            'nInserted': 0,
            'nRemoved': 1,
            'upserted': [],
            'writeErrors': [],
            'writeConcernErrors': []
        }

        bulk.find({}).remove_one()
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(self.coll.count(), 1)

        self.coll.insert_one({})

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({}).remove_one()
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual(self.coll.count(), 1)

        # Test removing one document, with a selector.
        # First ordered, then unordered.
        self.coll.insert_one({'x': 1})

        bulk = self.coll.initialize_ordered_bulk_op()
        bulk.find({'x': 1}).remove_one()
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual([{}], list(self.coll.find({}, {'_id': False})))
        self.coll.insert_one({'x': 1})

        bulk = self.coll.initialize_unordered_bulk_op()
        bulk.find({'x': 1}).remove_one()
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        self.assertEqual([{}], list(self.coll.find({}, {'_id': False})))

    def test_upsert(self):
        bulk = self.coll.initialize_ordered_bulk_op()

        # upsert() requires find() first.
        self.assertRaises(
            AttributeError,
            lambda: bulk.upsert())

        expected = {
            'nMatched': 0,
            'nModified': 0,
            'nUpserted': 1,
            'nInserted': 0,
            'nRemoved': 0,
            'upserted': [{'index': 0, '_id': '...'}]
        }

        bulk.find({}).upsert().replace_one({'foo': 'bar'})
        result = bulk.execute()
        self.assertEqualResponse(expected, result)

        bulk = self.coll.initialize_ordered_bulk_op()
        bulk.find({}).upsert().update_one({'$set': {'bim': 'baz'}})
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 1,
             'nModified': 1,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 0,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

        self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1)

        bulk = self.coll.initialize_ordered_bulk_op()
        bulk.find({}).upsert().update({'$set': {'bim': 'bop'}})
        # Non-upsert, no matches.
        bulk.find({'x': 1}).update({'$set': {'x': 2}})
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 1,
             'nModified': 1,
             'nUpserted': 0,
             'nInserted': 0,
             'nRemoved': 0,
             'upserted': [],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

        self.assertEqual(self.coll.find({'bim': 'bop'}).count(), 1)
        self.assertEqual(self.coll.find({'x': 2}).count(), 0)

    def test_upsert_large(self):
        big = 'a' * (client_context.client.max_bson_size - 37)
        bulk = self.coll.initialize_ordered_bulk_op()
        bulk.find({'x': 1}).upsert().update({'$set': {'s': big}})
        result = bulk.execute()
        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 1,
             'nInserted': 0,
             'nRemoved': 0,
             'upserted': [{'index': 0, '_id': '...'}]},
            result)

        self.assertEqual(1, self.coll.find({'x': 1}).count())

    def test_client_generated_upsert_id(self):
        batch = self.coll.initialize_ordered_bulk_op()
        batch.find({'_id': 0}).upsert().update_one({'$set': {'a': 0}})
        batch.find({'a': 1}).upsert().replace_one({'_id': 1})
        # This is just here to make the counts right in all cases.
        batch.find({'_id': 2}).upsert().replace_one({'_id': 2})
        result = batch.execute()
        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 3,
             'nInserted': 0,
             'nRemoved': 0,
             'upserted': [{'index': 0, '_id': 0},
                          {'index': 1, '_id': 1},
                          {'index': 2, '_id': 2}]},
            result)

    def test_single_ordered_batch(self):
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'a': 1})
        batch.find({'a': 1}).update_one({'$set': {'b': 1}})
        batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}})
        batch.insert({'a': 3})
        batch.find({'a': 3}).remove()
        result = batch.execute()
        self.assertEqualResponse(
            {'nMatched': 1,
             'nModified': 1,
             'nUpserted': 1,
             'nInserted': 2,
             'nRemoved': 1,
             'upserted': [{'index': 2, '_id': '...'}]},
            result)

    def test_single_error_ordered_batch(self):
        self.coll.create_index('a', unique=True)
        self.addCleanup(self.coll.drop_index, [('a', 1)])
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'b': 1, 'a': 1})
        batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}})
        batch.insert({'b': 3, 'a': 2})

        try:
            batch.execute()
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 1,
             'nRemoved': 0,
             'upserted': [],
             'writeConcernErrors': [],
             'writeErrors': [
                 {'index': 1,
                  'code': 11000,
                  'errmsg': '...',
                  'op': {'q': {'b': 2},
                         'u': {'$set': {'a': 1}},
                         'multi': False,
                         'upsert': True}}]},
            result)

    def test_multiple_error_ordered_batch(self):
        self.coll.create_index('a', unique=True)
        self.addCleanup(self.coll.drop_index, [('a', 1)])
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'b': 1, 'a': 1})
        batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}})
        batch.find({'b': 3}).upsert().update_one({'$set': {'a': 2}})
        batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}})
        batch.insert({'b': 4, 'a': 3})
        batch.insert({'b': 5, 'a': 1})

        try:
            batch.execute()
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 1,
             'nRemoved': 0,
             'upserted': [],
             'writeConcernErrors': [],
             'writeErrors': [
                 {'index': 1,
                  'code': 11000,
                  'errmsg': '...',
                  'op': {'q': {'b': 2},
                         'u': {'$set': {'a': 1}},
                         'multi': False,
                         'upsert': True}}]},
            result)

    def test_single_unordered_batch(self):
        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'a': 1})
        batch.find({'a': 1}).update_one({'$set': {'b': 1}})
        batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}})
        batch.insert({'a': 3})
        batch.find({'a': 3}).remove()
        result = batch.execute()
        self.assertEqualResponse(
            {'nMatched': 1,
             'nModified': 1,
             'nUpserted': 1,
             'nInserted': 2,
             'nRemoved': 1,
             'upserted': [{'index': 2, '_id': '...'}],
             'writeErrors': [],
             'writeConcernErrors': []},
            result)

    def test_single_error_unordered_batch(self):
        self.coll.create_index('a', unique=True)
        self.addCleanup(self.coll.drop_index, [('a', 1)])
        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'b': 1, 'a': 1})
        batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}})
        batch.insert({'b': 3, 'a': 2})

        try:
            batch.execute()
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 2,
             'nRemoved': 0,
             'upserted': [],
             'writeConcernErrors': [],
             'writeErrors': [
                 {'index': 1,
                  'code': 11000,
                  'errmsg': '...',
                  'op': {'q': {'b': 2},
                         'u': {'$set': {'a': 1}},
                         'multi': False,
                         'upsert': True}}]},
            result)

    def test_multiple_error_unordered_batch(self):
        self.coll.create_index('a', unique=True)
        self.addCleanup(self.coll.drop_index, [('a', 1)])
        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'b': 1, 'a': 1})
        batch.find({'b': 2}).upsert().update_one({'$set': {'a': 3}})
        batch.find({'b': 3}).upsert().update_one({'$set': {'a': 4}})
        batch.find({'b': 4}).upsert().update_one({'$set': {'a': 3}})
        batch.insert({'b': 5, 'a': 2})
        batch.insert({'b': 6, 'a': 1})

        try:
            batch.execute()
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")
        # Assume the update at index 1 runs before the update at index 3,
        # although the spec does not require it. Same for inserts.
        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 2,
             'nInserted': 2,
             'nRemoved': 0,
             'upserted': [
                 {'index': 1, '_id': '...'},
                 {'index': 2, '_id': '...'}],
             'writeConcernErrors': [],
             'writeErrors': [
                 {'index': 3,
                  'code': 11000,
                  'errmsg': '...',
                  'op': {'q': {'b': 4},
                         'u': {'$set': {'a': 3}},
                         'multi': False,
                         'upsert': True}},
                 {'index': 5,
                  'code': 11000,
                  'errmsg': '...',
                  'op': {'_id': '...', 'b': 6, 'a': 1}}]},
            result)

    @client_context.require_version_max(4, 8)  # PYTHON-2436
    def test_large_inserts_ordered(self):
        big = 'x' * self.coll.database.client.max_bson_size
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'b': 1, 'a': 1})
        batch.insert({'big': big})
        batch.insert({'b': 2, 'a': 2})

        try:
            batch.execute()
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqual(1, result['nInserted'])

        self.coll.delete_many({})

        big = 'x' * (1024 * 1024 * 4)
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'a': 1, 'big': big})
        batch.insert({'a': 2, 'big': big})
        batch.insert({'a': 3, 'big': big})
        batch.insert({'a': 4, 'big': big})
        batch.insert({'a': 5, 'big': big})
        batch.insert({'a': 6, 'big': big})
        result = batch.execute()

        self.assertEqual(6, result['nInserted'])
        self.assertEqual(6, self.coll.count())

    def test_large_inserts_unordered(self):
        big = 'x' * self.coll.database.client.max_bson_size
        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'b': 1, 'a': 1})
        batch.insert({'big': big})
        batch.insert({'b': 2, 'a': 2})

        try:
            batch.execute()
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqual(2, result['nInserted'])

        self.coll.delete_many({})

        big = 'x' * (1024 * 1024 * 4)
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'a': 1, 'big': big})
        batch.insert({'a': 2, 'big': big})
        batch.insert({'a': 3, 'big': big})
        batch.insert({'a': 4, 'big': big})
        batch.insert({'a': 5, 'big': big})
        batch.insert({'a': 6, 'big': big})
        result = batch.execute()

        self.assertEqual(6, result['nInserted'])
        self.assertEqual(6, self.coll.count())

    def test_numerous_inserts(self):
        # Ensure we don't exceed server's 1000-document batch size limit.
        n_docs = 2100
        batch = self.coll.initialize_unordered_bulk_op()
        for _ in range(n_docs):
            batch.insert({})

        result = batch.execute()
        self.assertEqual(n_docs, result['nInserted'])
        self.assertEqual(n_docs, self.coll.count())

        # Same with ordered bulk.
        self.coll.delete_many({})
        batch = self.coll.initialize_ordered_bulk_op()
        for _ in range(n_docs):
            batch.insert({})

        result = batch.execute()
        self.assertEqual(n_docs, result['nInserted'])
        self.assertEqual(n_docs, self.coll.count())

    def test_multiple_execution(self):
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({})
        batch.execute()
        self.assertRaises(InvalidOperation, batch.execute)

    def test_generator_insert(self):
        def gen():
            yield {'a': 1, 'b': 1}
            yield {'a': 1, 'b': 2}
            yield {'a': 2, 'b': 3}
            yield {'a': 3, 'b': 5}
            yield {'a': 5, 'b': 8}

        result = self.coll.insert_many(gen())
        self.assertEqual(5, len(result.inserted_ids))


class TestLegacyBulkNoResults(BulkTestBase):

    @classmethod
    def setUpClass(cls):
        super(TestLegacyBulkNoResults, cls).setUpClass()
        cls.deprecation_filter = DeprecationFilter()

    @classmethod
    def tearDownClass(cls):
        cls.deprecation_filter.stop()

    def tearDown(self):
        self.coll.delete_many({})

    def test_no_results_ordered_success(self):
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'_id': 1})
        batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}})
        batch.insert({'_id': 2})
        batch.find({'_id': 1}).remove_one()
        self.assertTrue(batch.execute({'w': 0}) is None)
        wait_until(lambda: 2 == self.coll.count(),
                   'insert 2 documents')
        wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
                   'removed {"_id": 1}')

    def test_no_results_ordered_failure(self):
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'_id': 1})
        batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}})
        batch.insert({'_id': 2})
        # Fails with duplicate key error.
        batch.insert({'_id': 1})
        # Should not be executed since the batch is ordered.
        batch.find({'_id': 1}).remove_one()
        self.assertTrue(batch.execute({'w': 0}) is None)
        wait_until(lambda: 3 == self.coll.count(),
                   'insert 3 documents')
        self.assertEqual({'_id': 1}, self.coll.find_one({'_id': 1}))

    def test_no_results_unordered_success(self):
        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'_id': 1})
        batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}})
        batch.insert({'_id': 2})
        batch.find({'_id': 1}).remove_one()
        self.assertTrue(batch.execute({'w': 0}) is None)
        wait_until(lambda: 2 == self.coll.count(),
                   'insert 2 documents')
        wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
                   'removed {"_id": 1}')

    def test_no_results_unordered_failure(self):
        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'_id': 1})
        batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}})
        batch.insert({'_id': 2})
        # Fails with duplicate key error.
        batch.insert({'_id': 1})
        # Should be executed since the batch is unordered.
        batch.find({'_id': 1}).remove_one()
        self.assertTrue(batch.execute({'w': 0}) is None)
        wait_until(lambda: 2 == self.coll.count(),
                   'insert 2 documents')
        wait_until(lambda: self.coll.find_one({'_id': 1}) is None,
                   'removed {"_id": 1}')


class TestLegacyBulkWriteConcern(BulkTestBase):

    @classmethod
    def setUpClass(cls):
        super(TestLegacyBulkWriteConcern, cls).setUpClass()
        cls.w = client_context.w
        cls.secondary = None
        if cls.w > 1:
            for member in client_context.hello['hosts']:
                if member != client_context.hello['primary']:
                    cls.secondary = single_client(*partition_node(member))
                    break

        # We tested wtimeout errors by specifying a write concern greater than
        # the number of members, but in MongoDB 2.7.8+ this causes a different
        # sort of error, "Not enough data-bearing nodes". In recent servers we
        # use a failpoint to pause replication on a secondary.
        cls.need_replication_stopped = client_context.version.at_least(2, 7, 8)
        cls.deprecation_filter = DeprecationFilter()

    @classmethod
    def tearDownClass(cls):
        cls.deprecation_filter.stop()
        if cls.secondary:
            cls.secondary.close()

    def cause_wtimeout(self, batch):
        if self.need_replication_stopped:
            if not client_context.test_commands_enabled:
                raise SkipTest("Test commands must be enabled.")

            self.secondary.admin.command('configureFailPoint',
                                         'rsSyncApplyStop',
                                         mode='alwaysOn')

            try:
                return batch.execute({'w': self.w, 'wtimeout': 1})
            finally:
                self.secondary.admin.command('configureFailPoint',
                                             'rsSyncApplyStop',
                                             mode='off')
        else:
            return batch.execute({'w': self.w + 1, 'wtimeout': 1})

    def test_fsync_and_j(self):
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'a': 1})
        self.assertRaises(
            ConfigurationError,
            batch.execute, {'fsync': True, 'j': True})

    @client_context.require_replica_set
    def test_write_concern_failure_ordered(self):
        # Ensure we don't raise on wnote.
        batch = self.coll.initialize_ordered_bulk_op()
        batch.find({"something": "that does no exist"}).remove()
        self.assertTrue(batch.execute({"w": self.w}))

        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'a': 1})
        batch.insert({'a': 2})

        # Replication wtimeout is a 'soft' error.
        # It shouldn't stop batch processing.
        try:
            self.cause_wtimeout(batch)
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 0,
             'nInserted': 2,
             'nRemoved': 0,
             'upserted': [],
             'writeErrors': []},
            result)

        # When talking to legacy servers there will be a
        # write concern error for each operation.
        self.assertTrue(len(result['writeConcernErrors']) > 0)

        failed = result['writeConcernErrors'][0]
        self.assertEqual(64, failed['code'])
        self.assertTrue(isinstance(failed['errmsg'], string_type))

        self.coll.delete_many({})
        self.coll.create_index('a', unique=True)
        self.addCleanup(self.coll.drop_index, [('a', 1)])

        # Fail due to write concern support as well
        # as duplicate key error on ordered batch.
        batch = self.coll.initialize_ordered_bulk_op()
        batch.insert({'a': 1})
        batch.find({'a': 3}).upsert().replace_one({'b': 1})
        batch.insert({'a': 1})
        batch.insert({'a': 2})
        try:
            self.cause_wtimeout(batch)
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqualResponse(
            {'nMatched': 0,
             'nModified': 0,
             'nUpserted': 1,
             'nInserted': 1,
             'nRemoved': 0,
             'upserted': [{'index': 1, '_id': '...'}],
             'writeErrors': [
                 {'index': 2,
                  'code': 11000,
                  'errmsg': '...',
                  'op': {'_id': '...', 'a': 1}}]},
            result)

        self.assertTrue(len(result['writeConcernErrors']) > 1)
        failed = result['writeErrors'][0]
        self.assertTrue("duplicate" in failed['errmsg'])

    @client_context.require_replica_set
    def test_write_concern_failure_unordered(self):
        # Ensure we don't raise on wnote.
        batch = self.coll.initialize_unordered_bulk_op()
        batch.find({"something": "that does no exist"}).remove()
        self.assertTrue(batch.execute({"w": self.w}))

        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'a': 1})
        batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}})
        batch.insert({'a': 2})

        # Replication wtimeout is a 'soft' error.
        # It shouldn't stop batch processing.
        try:
            self.cause_wtimeout(batch)
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqual(2, result['nInserted'])
        self.assertEqual(1, result['nUpserted'])
        self.assertEqual(0, len(result['writeErrors']))
        # When talking to legacy servers there will be a
        # write concern error for each operation.
        self.assertTrue(len(result['writeConcernErrors']) > 1)

        self.coll.delete_many({})
        self.coll.create_index('a', unique=True)
        self.addCleanup(self.coll.drop_index, [('a', 1)])

        # Fail due to write concern support as well
        # as duplicate key error on unordered batch.
        batch = self.coll.initialize_unordered_bulk_op()
        batch.insert({'a': 1})
        batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3,
                                                           'b': 1}})
        batch.insert({'a': 1})
        batch.insert({'a': 2})
        try:
            self.cause_wtimeout(batch)
        except BulkWriteError as exc:
            result = exc.details
            self.assertEqual(exc.code, 65)
        else:
            self.fail("Error not raised")

        self.assertEqual(2, result['nInserted'])
        self.assertEqual(1, result['nUpserted'])
        self.assertEqual(1, len(result['writeErrors']))
        # When talking to legacy servers there will be a
        # write concern error for each operation.
        self.assertTrue(len(result['writeConcernErrors']) > 1)

        failed = result['writeErrors'][0]
        self.assertEqual(2, failed['index'])
        self.assertEqual(11000, failed['code'])
        self.assertTrue(isinstance(failed['errmsg'], string_type))
        self.assertEqual(1, failed['op']['a'])

        failed = result['writeConcernErrors'][0]
        self.assertEqual(64, failed['code'])
        self.assertTrue(isinstance(failed['errmsg'], string_type))

        upserts = result['upserted']
        self.assertEqual(1, len(upserts))
        self.assertEqual(1, upserts[0]['index'])
        self.assertTrue(upserts[0].get('_id'))


class TestLegacyBulkAuthorization(BulkAuthorizationTestBase):

    @classmethod
    def setUpClass(cls):
        super(TestLegacyBulkAuthorization, cls).setUpClass()
        cls.deprecation_filter = DeprecationFilter()

    @classmethod
    def tearDownClass(cls):
        cls.deprecation_filter.stop()

    def test_readonly(self):
        # We test that an authorization failure aborts the batch and is raised
        # as OperationFailure.
        cli = rs_or_single_client_noauth()
        db = cli.pymongo_test
        coll = db.test
        db.authenticate('readonly', 'pw')
        bulk = coll.initialize_ordered_bulk_op()
        bulk.insert({'x': 1})
        self.assertRaises(OperationFailure, bulk.execute)

    def test_no_remove(self):
        # We test that an authorization failure aborts the batch and is raised
        # as OperationFailure.
        cli = rs_or_single_client_noauth()
        db = cli.pymongo_test
        coll = db.test
        db.authenticate('noremove', 'pw')
        bulk = coll.initialize_ordered_bulk_op()
        bulk.insert({'x': 1})
        bulk.find({'x': 2}).upsert().replace_one({'x': 2})
        bulk.find({}).remove()  # Prohibited.
        bulk.insert({'x': 3})   # Never attempted.
        self.assertRaises(OperationFailure, bulk.execute)
        self.assertEqual(set([1, 2]), set(self.coll.distinct('x')))

if __name__ == "__main__":
    unittest.main()