Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pymongo / test / test_common.py
Size: Mime:
# Copyright 2011-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test the pymongo common module."""

import sys
import uuid

sys.path[0:0] = [""]

from bson.binary import UUIDLegacy, PYTHON_LEGACY, STANDARD
from bson.code import Code
from bson.codec_options import CodecOptions
from bson.objectid import ObjectId
from pymongo.errors import OperationFailure
from pymongo.write_concern import WriteConcern
from test import client_context, unittest, IntegrationTest
from test.utils import connected, rs_or_single_client, single_client


@client_context.require_connection
def setUpModule():
    pass


class TestCommon(IntegrationTest):

    def test_uuid_representation(self):
        coll = self.db.uuid
        coll.drop()

        # Test property
        self.assertEqual(PYTHON_LEGACY,
                         coll.codec_options.uuid_representation)

        # Test basic query
        uu = uuid.uuid4()
        # Insert as binary subtype 3
        coll.insert_one({'uu': uu})
        self.assertEqual(uu, coll.find_one({'uu': uu})['uu'])
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=STANDARD))
        self.assertEqual(STANDARD, coll.codec_options.uuid_representation)
        self.assertEqual(None, coll.find_one({'uu': uu}))
        self.assertEqual(uu, coll.find_one({'uu': UUIDLegacy(uu)})['uu'])

        # Test count_documents
        self.assertEqual(0, coll.count_documents({'uu': uu}))
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY))
        self.assertEqual(1, coll.count_documents({'uu': uu}))

        # Test delete
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=STANDARD))
        coll.delete_one({'uu': uu})
        self.assertEqual(1, coll.count_documents({}))
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY))
        coll.delete_one({'uu': uu})
        self.assertEqual(0, coll.count_documents({}))

        # Test update_one
        coll.insert_one({'_id': uu, 'i': 1})
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=STANDARD))
        coll.update_one({'_id': uu}, {'$set': {'i': 2}})
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY))
        self.assertEqual(1, coll.find_one({'_id': uu})['i'])
        coll.update_one({'_id': uu}, {'$set': {'i': 2}})
        self.assertEqual(2, coll.find_one({'_id': uu})['i'])

        # Test Cursor.distinct
        self.assertEqual([2], coll.find({'_id': uu}).distinct('i'))
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=STANDARD))
        self.assertEqual([], coll.find({'_id': uu}).distinct('i'))

        # Test findAndModify
        self.assertEqual(None, coll.find_one_and_update({'_id': uu},
                                                        {'$set': {'i': 5}}))
        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY))
        self.assertEqual(2, coll.find_one_and_update({'_id': uu},
                                                     {'$set': {'i': 5}})['i'])
        self.assertEqual(5, coll.find_one({'_id': uu})['i'])

        # Test command
        self.assertEqual(5, self.db.command('findAndModify', 'uuid',
                                            update={'$set': {'i': 6}},
                                            query={'_id': uu})['value']['i'])
        self.assertEqual(6, self.db.command(
            'findAndModify', 'uuid',
            update={'$set': {'i': 7}},
            query={'_id': UUIDLegacy(uu)})['value']['i'])

        # Test (inline)_map_reduce
        coll.drop()
        coll.insert_one({"_id": uu, "x": 1, "tags": ["dog", "cat"]})
        coll.insert_one({"_id": uuid.uuid4(), "x": 3,
                         "tags": ["mouse", "cat", "dog"]})

        map = Code("function () {"
                   "  this.tags.forEach(function(z) {"
                   "    emit(z, 1);"
                   "  });"
                   "}")

        reduce = Code("function (key, values) {"
                      "  var total = 0;"
                      "  for (var i = 0; i < values.length; i++) {"
                      "    total += values[i];"
                      "  }"
                      "  return total;"
                      "}")

        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=STANDARD))
        q = {"_id": uu}
        result = coll.inline_map_reduce(map, reduce, query=q)
        self.assertEqual([], result)

        result = coll.map_reduce(map, reduce, "results", query=q)
        self.assertEqual(0, self.db.results.count_documents({}))

        coll = self.db.get_collection(
            "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY))
        q = {"_id": uu}
        result = coll.inline_map_reduce(map, reduce, query=q)
        self.assertEqual(2, len(result))

        result = coll.map_reduce(map, reduce, "results", query=q)
        self.assertEqual(2, self.db.results.count_documents({}))

        self.db.drop_collection("result")
        coll.drop()

    def test_write_concern(self):
        c = rs_or_single_client(connect=False)
        self.assertEqual(WriteConcern(), c.write_concern)

        c = rs_or_single_client(connect=False, w=2, wtimeout=1000)
        wc = WriteConcern(w=2, wtimeout=1000)
        self.assertEqual(wc, c.write_concern)

        # Can we override back to the server default?
        db = c.get_database('pymongo_test', write_concern=WriteConcern())
        self.assertEqual(db.write_concern, WriteConcern())

        db = c.pymongo_test
        self.assertEqual(wc, db.write_concern)
        coll = db.test
        self.assertEqual(wc, coll.write_concern)

        cwc = WriteConcern(j=True)
        coll = db.get_collection('test', write_concern=cwc)
        self.assertEqual(cwc, coll.write_concern)
        self.assertEqual(wc, db.write_concern)

    def test_mongo_client(self):
        pair = client_context.pair
        m = rs_or_single_client(w=0)
        coll = m.pymongo_test.write_concern_test
        coll.drop()
        doc = {"_id": ObjectId()}
        coll.insert_one(doc)
        self.assertTrue(coll.insert_one(doc))
        coll = coll.with_options(write_concern=WriteConcern(w=1))
        self.assertRaises(OperationFailure, coll.insert_one, doc)

        m = rs_or_single_client()
        coll = m.pymongo_test.write_concern_test
        new_coll = coll.with_options(write_concern=WriteConcern(w=0))
        self.assertTrue(new_coll.insert_one(doc))
        self.assertRaises(OperationFailure, coll.insert_one, doc)

        m = rs_or_single_client("mongodb://%s/" % (pair,),
                                replicaSet=client_context.replica_set_name)

        coll = m.pymongo_test.write_concern_test
        self.assertRaises(OperationFailure, coll.insert_one, doc)
        m = rs_or_single_client("mongodb://%s/?w=0" % (pair,),
                                replicaSet=client_context.replica_set_name)

        coll = m.pymongo_test.write_concern_test
        coll.insert_one(doc)

        # Equality tests
        direct = connected(single_client(w=0))
        direct2 = connected(single_client("mongodb://%s/?w=0" % (pair,),
                                          **self.credentials))
        self.assertEqual(direct, direct2)
        self.assertFalse(direct != direct2)


if __name__ == "__main__":
    unittest.main()