Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
mongokit / tests / test_api.py
Size: Mime:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2009-2011, Nicolas Clairon
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the University of California, Berkeley nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import unittest

from mongokit import *
from bson.objectid import ObjectId
from pymongo import ReadPreference


class ApiTestCase(unittest.TestCase):
    def setUp(self):
        self.connection = Connection()
        self.col = self.connection['test']['mongokit']

    def tearDown(self):
        self.connection.drop_database('test')
        self.connection.drop_database('othertest')

    def test_save(self):
        class MyDoc(Document):
            structure = {
                "bla":{
                    "foo":unicode,
                    "bar":int,
                },
                "spam":[],
            }
        self.connection.register([MyDoc])
        mydoc = self.col.MyDoc()
        mydoc["bla"]["foo"] = u"bar"
        mydoc["bla"]["bar"] = 42
        mydoc.save()
        assert isinstance(mydoc['_id'], ObjectId)

        saved_doc = self.col.find_one({"bla.bar":42})
        for key, value in mydoc.iteritems():
            assert saved_doc[key] == value

        mydoc = self.col.MyDoc()
        mydoc["bla"]["foo"] = u"bar"
        mydoc["bla"]["bar"] = 43
        mydoc.save(uuid=True)
        assert isinstance(mydoc['_id'], unicode)
        assert mydoc['_id'].startswith("MyDoc"), id

        saved_doc = self.col.find_one({"bla.bar":43})
        for key, value in mydoc.iteritems():
            assert saved_doc[key] == value

    def test_save_without_collection(self):
        class MyDoc(Document):
            structure = {
                "foo":int,
            }
        self.connection.register([MyDoc])
        mydoc = MyDoc()
        mydoc["foo"] = 1
        self.assertRaises(ConnectionError, mydoc.save)

    def test_delete(self):
        class MyDoc(Document):
            structure = {
                "foo":int,
            }
        self.connection.register([MyDoc])
        mydoc = self.col.MyDoc()
        mydoc['_id'] = 'foo'
        mydoc["foo"] = 1
        mydoc.save()
        assert self.col.MyDoc.find().count() == 1
        mydoc = self.col.MyDoc.get_from_id('foo')
        assert mydoc['foo'] == 1
        mydoc.delete()
        assert self.col.MyDoc.find().count() == 0

    def test_generate_skeleton(self):
        class A(SchemaDocument):
            structure = {
                "a":{"foo":int},
                "bar":unicode
            }
        a = A(gen_skel=False)
        assert a == {}
        a.generate_skeleton()
        assert a == {"a":{"foo":None}, "bar":None}, a

    def test_generate_skeleton2(self):
        class A(SchemaDocument):
            structure = {
                "a":{"foo":[int]},
                "bar":{unicode:{"egg":int}}
            }
        a = A(gen_skel=False)
        assert a == {}
        a.generate_skeleton()
        assert a == {"a":{"foo":[]}, "bar":{}}, a

    def test_generate_skeleton3(self):
        class A(SchemaDocument):
            structure = {
                "a":{"foo":[int], "spam":{"bla":unicode}},
                "bar":{unicode:{"egg":int}}
            }
        a = A(gen_skel=False)
        assert a == {}
        a.generate_skeleton()
        assert a == {"a":{"foo":[], "spam":{"bla":None}}, "bar":{}}, a

    def test_get_from_id(self):
        class MyDoc(Document):
            structure = {
                "foo":int,
            }
        self.connection.register([MyDoc])
        mydoc = self.col.MyDoc()
        mydoc["_id"] = "bar"
        mydoc["foo"] = 3
        mydoc.save()
        fetched_doc = self.col.MyDoc.get_from_id("bar")
        assert mydoc == fetched_doc
        assert callable(fetched_doc) is False
        assert isinstance(fetched_doc, MyDoc)
        raw_doc = self.col.get_from_id('bar')
        assert mydoc == raw_doc
        assert not isinstance(raw_doc, MyDoc)

    def test_find(self):
        class MyDoc(Document):
            structure = {
                "foo":int,
                "bar":{"bla":int},
            }
        self.connection.register([MyDoc])
        for i in range(10):
            mydoc = self.col.MyDoc()
            mydoc["foo"] = i
            mydoc["bar"]['bla'] = i
            mydoc.save()
        for i in self.col.MyDoc.find({"foo":{"$gt":4}}):
            assert isinstance(i, MyDoc), (i, type(i))
        docs_list = [i["foo"] for i in self.col.MyDoc.find({"foo":{"$gt":4}})]
        assert docs_list == [5,6,7,8,9]
        # using limit/count
        assert self.col.MyDoc.find().count() == 10, self.col.MyDoc.find().count()
        assert self.col.MyDoc.find().limit(1).count() == 10, self.col.MyDoc.find().limit(1).count()
        assert self.col.MyDoc.find().where('this.foo').count() == 9 #{'foo':0} is not taken
        assert self.col.MyDoc.find().where('this.bar.bla').count() == 9 #{'foo':0} is not taken
        assert self.col.MyDoc.find().hint([('foo', 1)])
        assert [i['foo'] for i in self.col.MyDoc.find().sort('foo', -1)] == [9,8,7,6,5,4,3,2,1,0]
        allPlans = self.col.MyDoc.find().explain()['allPlans']
        allPlans[0].pop(u'indexBounds', None)
        allPlans[0].pop(u'indexOnly', None)
        allPlans[0].pop(u'nChunkSkips', None)
        allPlans[0].pop(u'scanAndOrder', None)
        allPlans[0].pop(u'isMultiKey', None)
        self.assertEqual(
            allPlans,
            [
                {
                    u'cursor': u'BasicCursor',
                    u'nscannedObjects': 10,
                    u'nscanned': 10,
                    u'n': 10,
                },
            ],
        )
        next_doc =  self.col.MyDoc.find().sort('foo',1).next()
        assert callable(next_doc) is False
        assert isinstance(next_doc, MyDoc)
        assert next_doc['foo'] == 0
        assert len(list(self.col.MyDoc.find().skip(3))) == 7, len(list(self.col.MyDoc.find().skip(3)))
        from mongokit.cursor import Cursor
        assert isinstance(self.col.MyDoc.find().skip(3), Cursor)

    def test_find_one(self):
        class MyDoc(Document):
            structure = {
                "foo":int
            }
        self.connection.register([MyDoc])
        assert self.col.MyDoc.find_one() is None
        mydoc = self.col.MyDoc()
        mydoc['foo'] = 0
        mydoc.save()
        mydoc = self.col.MyDoc.find_one()
        assert mydoc["foo"] == 0
        assert isinstance(mydoc, MyDoc)
        for i in range(10):
            mydoc = self.col.MyDoc()
            mydoc["foo"] = i
            mydoc.save()
        one_doc = self.col.MyDoc.find_one()
        assert callable(one_doc) is False
        raw_mydoc = self.col.find_one()
        assert one_doc == raw_mydoc

    def test_find_and_modify_query_fails(self):
        class MyDoc(Document):
            structure = {
                "baz":int
            }
        self.connection.register([MyDoc])
        assert self.col.MyDoc.find_and_modify(query={"baz": 1}, update={"$set": {"baz": 2}}) is None

    def test_find_and_modify_query_succeeds(self):
        class MyDoc(Document):
            structure = {
                "baz":int
            }
        self.connection.register([MyDoc])
        mydoc = self.col.MyDoc()
        mydoc["baz"] = 1
        mydoc.save()

        mydoc = self.col.MyDoc.find_and_modify(query={"baz": 1}, update={"$set": {"baz": 2}}, new=True)
        assert isinstance(mydoc, MyDoc)
        self.assertEquals(2, mydoc["baz"])

    def test_one(self):
        class MyDoc(Document):
            structure = {
                "foo":int
            }
        self.connection.register([MyDoc])
        mydoc = self.col.MyDoc()
        mydoc['foo'] = 0
        mydoc.save()
        raw_mydoc = self.col.one()
        mydoc = self.col.MyDoc.one()
        assert callable(mydoc) is False
        assert mydoc == raw_mydoc
        assert mydoc["foo"] == 0
        assert isinstance(mydoc, MyDoc)
        for i in range(10):
            mydoc = self.col.MyDoc()
            mydoc["foo"] = i
            mydoc.save()
        self.assertRaises(MultipleResultsFound, self.col.MyDoc.one)
        self.assertRaises(MultipleResultsFound, self.col.one)

    def test_find_random(self):
        class MyDoc(Document):
            structure = {
                "foo":int
            }
        self.connection.register([MyDoc])
        assert self.col.find_random() is None
        assert self.col.MyDoc.find_random() is None
        for i in range(50):
            mydoc = self.col.MyDoc()
            mydoc["foo"] = i
            mydoc.save()
        raw_mydoc = self.col.find_random()
        mydoc = self.col.MyDoc.find_random()
        assert callable(mydoc) is False
        assert isinstance(mydoc, MyDoc)
        assert mydoc != raw_mydoc, (mydoc, raw_mydoc)

    def test_fetch(self):
        class DocA(Document):
            structure = {
                "doc_a":{'foo':int},
            }
        self.connection.register([DocA])
        class DocB(Document):
            structure = {
                "doc_b":{"bar":int},
            }
        self.connection.register([DocB])
        # creating DocA
        for i in range(10):
            mydoc = self.col.DocA()
            mydoc['doc_a']["foo"] = i
            mydoc.save()
        # creating DocB
        for i in range(5):
            mydoc = self.col.DocB()
            mydoc['doc_b']["bar"] = i
            mydoc.save()

        # all get all documents present in the collection (ie: 15 here)
        assert self.col.DocA.find().count() == 15

        # fetch get only the corresponding documents:
        assert self.col.DocA.fetch().count() == 10, self.col.DocA.fetch().count()
        assert self.col.DocB.fetch().count() == 5
        index = 0
        for doc in self.col.DocA.fetch():
            assert doc == {'_id':doc['_id'], 'doc_a':{'foo':index}}, doc
            assert callable(doc) is False
            index += 1

        #assert DocA.fetch().limit(12).count() == 10, DocA.fetch().limit(1).count() # ???
        assert self.col.DocA.fetch().where('this.doc_a.foo > 3').count() == 6

    def test_fetch_with_query(self):
        class DocA(Document):
            structure = {
                "bar":unicode,
                "doc_a":{'foo':int},
            }
        class DocB(Document):
            structure = {
                "bar":unicode,
                "doc_b":{"bar":int},
            }
        self.connection.register([DocA, DocB])

        # creating DocA
        for i in range(10):
            mydoc = self.col.DocA()
            if i % 2 == 0:
                mydoc['bar'] = u"spam"
            else:
                mydoc['bar'] = u"egg"
            mydoc['doc_a']["foo"] = i
            mydoc.save()
        # creating DocB
        for i in range(5):
            mydoc = self.col.DocB()
            if i % 2 == 0:
                mydoc['bar'] = u"spam"
            else:
                mydoc['bar'] = u"egg"
            mydoc['doc_b']["bar"] = i
            mydoc.save()

        # all get all documents present in the collection (ie: 15 here)
        assert self.col.DocA.find().count() == 15
        assert self.col.DocA.fetch().count() == 10, self.col.DocA.fetch().count()
        assert self.col.DocB.fetch().count() == 5

        assert self.col.DocA.fetch({'bar':'spam'}).count() == 5
        assert self.col.DocB.fetch({'bar':'spam'}).count() == 3

    def test_fetch_inheritance(self):
        class Doc(Document):
            structure = {
                "doc":{'bla':int},
            }
        class DocA(Doc):
            structure = {
                "doc_a":{'foo':int},
            }
        class DocB(DocA):
            structure = {
                "doc_b":{"bar":int},
            }
        self.connection.register([Doc, DocA, DocB])
        # creating DocA
        for i in range(10):
            mydoc = self.col.DocA()
            mydoc['doc']["bla"] = i+1
            mydoc['doc_a']["foo"] = i
            mydoc.save()
        # creating DocB
        for i in range(5):
            mydoc = self.col.DocB()
            mydoc['doc']["bla"] = i+1
            mydoc['doc_a']["foo"] = i
            mydoc['doc_b']["bar"] = i+2
            mydoc.save()

        # all get all documents present in the collection (ie: 15 here)
        assert self.col.DocA.find().count() == 15

        # fetch get only the corresponding documents:
        # DocB is a subclass of DocA and have all fields of DocA so
        # we get all doc here
        assert self.col.DocA.fetch().count() == 15, self.col.DocA.fetch().count()
        # but only the DocB as DocA does not have a 'doc_a' field
        assert self.col.DocB.fetch().count() == 5
        index = 0
        for doc in self.col.DocB.fetch():
            assert doc == {'_id':doc['_id'], 'doc_a':{'foo':index}, 'doc':{'bla':index+1}, "doc_b":{'bar':index+2}}, (doc, index)
            index += 1

        #assert DocA.fetch().limit(12).count() == 10, DocA.fetch().limit(1).count() # ???
        assert self.col.DocA.fetch().where('this.doc_a.foo > 3').count() == 7

    def test_fetch_one(self):
        class DocA(Document):
            structure = {
                "doc_a":{'foo':int},
            }
        class DocB(DocA):
            structure = {
                "doc_b":{"bar":int},
            }
        self.connection.register([DocA, DocB])

        # creating DocA
        mydoc = self.col.DocA()
        mydoc['doc_a']["foo"] = 1
        mydoc.save()
        # creating DocB
        mydoc = self.col.DocB()
        mydoc['doc_b']["bar"] = 2
        mydoc.save()

        docb = self.col.DocB.fetch_one()
        assert callable(docb) is False
        assert docb
        assert isinstance(docb, DocB)
        self.assertRaises(MultipleResultsFound, self.col.DocA.fetch_one)

    def test_query_with_passing_collection(self):
        class MyDoc(Document):
            structure = {
                'foo':int,
            }
        self.connection.register([MyDoc])

        mongokit = self.connection.test.mongokit

        # boostraping
        for i in range(10):
            mydoc = mongokit.MyDoc()
            mydoc['_id'] = unicode(i)
            mydoc['foo'] = i
            mydoc.save()

        # get_from_id
        fetched_doc = mongokit.MyDoc.get_from_id('4')
        assert fetched_doc.collection == mongokit

        # all
        fetched_docs = mongokit.MyDoc.find({'foo':{'$gt':2}})
        assert fetched_docs.count() == 7
        for doc in fetched_docs:
            assert doc.collection == mongokit

        # one
        doc = mongokit.MyDoc.fetch_one({'foo':2})
        assert doc.collection == mongokit

        # fetch
        fetched_docs = mongokit.MyDoc.fetch({'foo':{'$gt':2}})
        assert fetched_docs.count() == 7
        for doc in fetched_docs:
            assert doc.collection == mongokit

        # fetch_one
        doc = mongokit.MyDoc.fetch_one({'foo':2})
        assert doc.collection == mongokit

    def test_skip_validation(self):
        class DocA(Document):
            structure = {
                "doc_a":{'foo':int},
            }
        self.connection.register([DocA])

        # creating DocA
        mydoc = self.col.DocA()
        mydoc['doc_a']["foo"] = u'bar'
        assertion = False
        try:
            mydoc.save()
        except SchemaTypeError:
            assertion = True
        assert assertion
        mydoc.save(validate=False)

        DocA.skip_validation = True

        # creating DocA
        mydoc = self.col.DocA()
        mydoc['doc_a']["foo"] = u'foo'
        self.assertRaises(SchemaTypeError, mydoc.save, validate=True)
        mydoc.save()


    def test_connection(self):
        class DocA(Document):
            structure = {
                "doc_a":{'foo':int},
            }
        self.connection.register([DocA])
        assertion = True
        try:
            DocA.connection
        except AttributeError:
            assertion = True
        assert assertion
        try:
            DocA.db
        except AttributeError:
            assertion = True
        assert assertion
        try:
            DocA.collection
        except AttributeError:
            assertion = True
        assert assertion

        assert self.col.DocA.connection == Connection("localhost", 27017)
        assert self.col.DocA.collection == Connection("localhost", 27017)['test']['mongokit']
        assert self.col.DocA.db == Connection("localhost", 27017)['test']

    def test_all_with_dynamic_collection(self):
        class Section(Document):
            structure = {"section":int}
        self.connection.register([Section])

        s = self.connection.test.section.Section()
        s['section'] = 1
        s.save()

        s = self.connection.test.section.Section()
        s['section'] = 2
        s.save()

        s = self.connection.test.other_section.Section()
        s['section'] = 1
        s.save()

        s = self.connection.test.other_section.Section()
        s['section'] = 2
        s.save()


        sect_col = self.connection.test.section
        sects = [s.collection.name == 'section' and s.db.name == 'test' for s in sect_col.Section.find({})]
        assert len(sects) == 2, len(sects)
        assert any(sects)
        sects = [s.collection.name == 'section' and s.db.name == 'test' for s in sect_col.Section.fetch()]
        assert len(sects) == 2
        assert any(sects)

        sect_col = self.connection.test.other_section
        sects = [s.collection.name == 'other_section' and s.db.name == 'test' for s in sect_col.Section.find({})]
        assert len(sects) == 2
        assert any(sects)
        sects = [s.collection.name == 'other_section' and s.db.name == 'test' for s in sect_col.Section.fetch()]
        assert len(sects) == 2
        assert any(sects)

    def test_get_collection_with_connection(self):
        class Section(Document):
            structure = {"section":int}
        connection = Connection('localhost')
        connection.register([Section])
        col = connection.test.mongokit
        assert col.database.connection == col.Section.connection
        assert col.database.name == 'test' == col.Section.db.name
        assert col.name == 'mongokit' == col.Section.collection.name

    def test_get_size(self):
        class MyDoc(Document):
            structure = {
                "doc":{"foo":int, "bla":unicode},
            }
        self.connection.register([MyDoc])

        mydoc = self.col.MyDoc()
        mydoc['doc']['foo'] = 3
        mydoc['doc']['bla'] = u'bla bla'
        assert mydoc.get_size() == 41, mydoc.get_size()

        mydoc['doc']['bla'] = u'bla bla'+'b'*12
        assert mydoc.get_size() == 41+12

        mydoc.validate()

        mydoc['doc']['bla'] = u'b'*40000000
        self.assertRaises(MaxDocumentSizeError, mydoc.validate)

    def test_get_with_no_wrap(self):
        class MyDoc(Document):
            structure = {"foo":int}
        self.connection.register([MyDoc])

        for i in xrange(2000):
            mydoc = self.col.MyDoc()
            mydoc['foo'] = i
            mydoc.save()

        import time
        start = time.time()
        wrapped_mydocs = [i for i in self.col.MyDoc.find()]
        end = time.time()
        wrap_time = end-start

        start = time.time()
        mydocs = [i for i in self.col.find().sort('foo', -1)]
        end = time.time()
        no_wrap_time = end-start

        assert no_wrap_time < wrap_time

        assert isinstance(wrapped_mydocs[0], MyDoc)
        assert not isinstance(mydocs[0], MyDoc), type(mydocs[0])
        assert [i['foo'] for i in mydocs] == list(reversed(range(2000))), [i['foo'] for i in mydocs]
        assert mydocs[0]['foo'] == 1999, mydocs[0]['foo']

        assert not isinstance(self.col.find().sort('foo', -1).next(), MyDoc)

    def test_get_dbref(self):
        class MyDoc(Document):
            structure = {"foo":int}
        self.connection.register([MyDoc])

        mydoc = self.col.MyDoc()
        mydoc['_id'] = u'1'
        mydoc['foo'] = 1
        mydoc.save()

        mydoc = self.connection.test.othercol.MyDoc()
        mydoc['_id'] = u'2'
        mydoc['foo'] = 2
        mydoc.save()

        mydoc = self.connection.othertest.mongokit.MyDoc()
        mydoc['_id'] = u'3'
        mydoc['foo'] = 3
        mydoc.save()

        mydoc = self.col.MyDoc.find_one({'foo':1})
        assert mydoc.get_dbref(), DBRef(u'mongokit', u'1', u'test')

        mydoc = self.connection.test.othercol.MyDoc.find_one({'foo':2})
        assert mydoc.get_dbref() == DBRef(u'othercol', u'2', u'test')

        mydoc = self.connection.othertest.mongokit.MyDoc.find_one({'foo':3})
        assert mydoc.get_dbref() == DBRef(u'mongokit', u'3', u'othertest')

    def test__hash__(self):
        class MyDoc(Document):
            structure = {"foo":int}
        self.connection.register([MyDoc])

        mydoc = self.col.MyDoc()
        mydoc['foo'] = 1
        self.assertRaises(TypeError, hash, mydoc)

        mydoc.save()
        hash(mydoc)

    def test_non_callable(self):
        class MyDoc(Document):
            structure = {"foo":int}
        self.connection.register([MyDoc])
        mydoc = self.col.MyDoc()
        self.assertRaises(TypeError, mydoc)
        assert callable(mydoc) is False


    def test_bad_call(self):
        class MyDoc(Document):
            structure = {"foo":int}
        self.assertRaises(TypeError, self.connection.test.col.MyDoc)
        self.connection.register([MyDoc])
        self.connection.test.col.MyDoc()
        self.assertRaises(TypeError, self.connection.test.col.Bla)
        self.assertRaises(TypeError, self.connection.test.Bla)


    def test_fetched_dot_notation(self):
        class MyDoc(Document):
            use_dot_notation = True
            structure = {
                "foo":int,
                "bar":{"egg":unicode},
                "toto":{"spam":{"bla":int}}
            }

        self.connection.register([MyDoc])
        mydoc = self.col.MyDoc()
        mydoc.foo = 3
        mydoc.bar.egg = u'bla'
        mydoc.toto.spam.bla = 7
        mydoc.save()
        fetched_doc = self.col.MyDoc.find_one()
        assert fetched_doc.foo == 3, fetched_doc.foo
        assert fetched_doc.bar.egg == "bla", fetched_doc.bar.egg
        self.assertEqual(fetched_doc.toto.spam.bla, 7)


    def test_validate_doc_with_field_added_after_save(self):
        class Doc(Document):
           structure = {
               "foo": unicode,
           }
        self.connection.register([Doc])

        doc = self.col.Doc()
        doc['foo'] = u"bla"
        doc.save()
        doc['bar'] = 2
        self.assertRaises(StructureError, doc.validate)

    def test_distinct(self):
        class Doc(Document):
            structure = {
                "foo": unicode,
                "bla": int
            }
        self.connection.register([Doc])

        for i in range(15):
            if i % 2 == 0:
                foo = u"blo"
            else:
                foo = u"bla"
            doc = self.col.Doc(doc={'foo':foo, 'bla':i})
            doc.save()
        assert self.col.find().distinct('foo') == ['blo', 'bla']
        assert self.col.find().distinct('bla') == range(15)

    def test_explain(self):
        class MyDoc(Document):
            structure = {
                "foo":int,
                "bar":{"bla":int},
            }
        self.connection.register([MyDoc])
        for i in range(10):
            mydoc = self.col.MyDoc()
            mydoc["foo"] = i
            mydoc["bar"]['bla'] = i
            mydoc.save()
        explain1 = self.col.MyDoc.find({"foo":{"$gt":4}}).explain()
        explain2 = self.col.find({'foo':{'gt':4}}).explain()
        explain1.pop('n')
        explain2.pop('n')
        explain1['allPlans'][0].pop('n')
        explain1.pop('stats', None)
        explain2['allPlans'][0].pop('n')
        explain2.pop('stats', None)
        self.assertEqual(explain1, explain2)

    def test_with_long(self):
        class Doc(Document):
            structure = {
                "foo":OR(int, long),
                "bar":unicode,
            }
        self.connection.register([Doc])
        doc = self.col.Doc()
        doc['foo'] = 12L
        doc.save()
        fetch_doc = self.col.Doc.find_one()
        fetch_doc['bar'] = u'egg'
        fetch_doc.save()

    def test_skip_validation_with_required_field(self):
        class Task(Document):
            structure = {
                'extra' : unicode,
            }
            required_fields = ['extra']
            skip_validation = True
        self.connection.register([Task])
        task = self.col.Task()
        task['extra'] = u'foo'
        task.validate()

    def test_passing_collection_in_argument(self):
        class MyDoc(Document):
            structure = {
                'foo':unicode
            }
        doc = MyDoc(collection=self.col)
        doc['foo'] = u'bla'
        doc.save()

    def test_reload(self):
        class MyDoc(Document):
            structure = {
                'foo':{
                    'bar':unicode,
                    'eggs':{'spam':int},
                },
                'bla':unicode
            }
        self.connection.register([MyDoc])

        doc = self.col.MyDoc()
        self.assertRaises(KeyError, doc.reload)
        doc['_id'] = 3
        doc['foo']['bar'] = u'mybar'
        doc['foo']['eggs']['spam'] = 4
        doc['bla'] = u'ble'
        self.assertRaises(OperationFailure, doc.reload)
        doc.save()

        doc['bla'] = u'bli'

        self.col.update({'_id':doc['_id']}, {'$set':{'foo.eggs.spam':2}})

        doc.reload()
        assert doc == {'_id': 3, 'foo': {u'eggs': {u'spam': 2}, u'bar': u'mybar'}, 'bla': u'ble'}

    def test_rewind(self):
        class MyDoc(Document):
            structure = {
                'foo':int,
            }
        self.connection.register([MyDoc])

        for i in range(10):
            doc = self.col.MyDoc()
            doc['foo'] = i
            doc.save()

        cur = self.col.MyDoc.find()
        for i in cur:
            assert isinstance(i, MyDoc), type(MyDoc)
        try:
            cur.next()
        except StopIteration:
            pass
        cur.rewind()
        for i in cur:
            assert isinstance(i, MyDoc), type(MyDoc)
        for i in cur.rewind():
            assert isinstance(i, MyDoc), type(MyDoc)


    def test_decorator(self):
        @self.connection.register
        class MyDoc(Document):
            structure = {
                'foo':int,
            }

        mydoc = self.col.MyDoc()
        mydoc['foo'] = 3
        mydoc.save()

        raw_doc = self.col.MyDoc.find_one()
        self.assertEqual(raw_doc['foo'], 3)
        assert isinstance(raw_doc, MyDoc)


    def test_collection_name_filled(self):

        @self.connection.register
        class MyDoc(Document):
            __collection__ = 'mydoc'
            structure = {
                'foo':int,
            }

        mydoc = self.connection.test.MyDoc()
        mydoc['foo'] = 3
        mydoc.save()
        self.assertEqual(mydoc.collection.name, 'mydoc')

        raw_doc = self.connection.test.MyDoc.find_one()
        self.assertEqual(self.col.MyDoc.find_one(), None)
        self.assertEqual(raw_doc['foo'], 3)
        self.assertEqual(raw_doc, mydoc)
        assert isinstance(raw_doc, MyDoc)

        mydoc = self.col.MyDoc()
        mydoc['foo'] = 3
        mydoc.save()
        self.assertEqual(mydoc.collection.name, 'mongokit')

        raw_doc = self.col.MyDoc.find_one()
        self.assertEqual(raw_doc['foo'], 3)
        self.assertEqual(raw_doc, mydoc)
        assert isinstance(raw_doc, MyDoc)

    def test_database_name_filled(self):

        failed = False
        @self.connection.register
        class MyDoc(Document):
            __database__ = 'mydoc'
            structure = {
                'foo':int,
            }
        try:
            doc = self.connection.MyDoc()
        except AttributeError, e:
            failed = True
            self.assertEqual(str(e), 'MyDoc: __collection__ attribute not '
              'found. You cannot specify the `__database__` attribute '
              'without the `__collection__` attribute')
        self.assertEqual(failed, True)

        @self.connection.register
        class MyDoc(Document):
            __database__ = 'test'
            __collection__ = 'mydoc'
            structure = {
                'foo':int,
            }

        # test directly from a connection
        mydoc = self.connection.MyDoc()
        mydoc['foo'] = 3
        mydoc.save()
        self.assertEqual(mydoc.collection.name, 'mydoc')
        self.assertEqual(mydoc.collection.database.name, 'test')
        self.assertEqual(self.col.MyDoc.find_one(), None)

        raw_doc = self.connection.MyDoc.find_one()
        self.assertEqual(raw_doc['foo'], 3)
        self.assertEqual(raw_doc, mydoc)
        assert isinstance(raw_doc, MyDoc)

        # test directly from a database
        mydoc = self.connection.othertest.MyDoc()
        mydoc['foo'] = 3
        mydoc.save()
        self.assertEqual(mydoc.collection.name, 'mydoc')
        self.assertEqual(mydoc.collection.database.name, 'othertest')
        self.assertEqual(self.col.MyDoc.find_one(), None)

        raw_doc = self.connection.othertest.MyDoc.find_one()
        self.assertEqual(raw_doc['foo'], 3)
        self.assertEqual(raw_doc, mydoc)
        assert isinstance(raw_doc, MyDoc)

        # and still can use it via a collection
        mydoc = self.col.MyDoc()
        mydoc['foo'] = 3
        mydoc.save()
        self.assertEqual(mydoc.collection.name, 'mongokit')
        self.assertEqual(mydoc.collection.database.name, 'test')

        raw_doc = self.col.MyDoc.find_one()
        self.assertEqual(raw_doc['foo'], 3)
        self.assertEqual(raw_doc, mydoc)
        assert isinstance(raw_doc, MyDoc)


    def test_no_collection_in_virtual_document(self):
        @self.connection.register
        class Root(Document):
            __database__ = "test"

        @self.connection.register
        class DocA(Root):
           __collection__ = "doca"
           structure = {'title':unicode}

        doc = self.connection.DocA()
        doc['title'] = u'foo'
        doc.save()

        self.assertEqual(self.connection.test.doca.find_one(), doc)


    def test_basestring_type(self):
        @self.connection.register
        class DocA(Document):
           __database__ = 'test'
           __collection__ = "doca"
           structure = {'title':unicode}

        doc = self.connection.DocA()
        doc['title'] = 'foo'
        failed = False
        try:
            doc.save()
        except SchemaTypeError, e:
            self.assertEqual(str(e), "title must be an instance of unicode not str")
            failed = True
        self.assertEqual(failed, True)

        @self.connection.register
        class DocA(Document):
           __database__ = 'test'
           __collection__ = "doca"
           authorized_types = Document.authorized_types+[str]
           structure = {'title':str}

        doc = self.connection.DocA()
        doc['title'] = u'foo'
        failed = False
        try:
            doc.save()
        except SchemaTypeError, e:
            self.assertEqual(str(e), "title must be an instance of str not unicode")
            failed = True
        self.assertEqual(failed, True)


        @self.connection.register
        class DocA(Document):
           __database__ = 'test'
           __collection__ = "doca"
           structure = {'title':basestring}

        doc = self.connection.DocA()
        doc['title'] = u'foo'
        doc.save()
        doc['title'] = 'foo'
        doc.save()

        self.assertEqual(self.connection.test.doca.find_one(), doc)

    def test_float_and_int_types(self):
        @self.connection.register
        class DocA(Document):
           __database__ = 'test'
           __collection__ = "doca"
           structure = {'foo':int}

        doc = self.connection.DocA()
        doc['foo'] = 3.0
        failed = False
        try:
            doc.save()
        except SchemaTypeError, e:
            self.assertEqual(str(e), "foo must be an instance of int not float")
            failed = True
        self.assertEqual(failed, True)

        @self.connection.register
        class DocA(Document):
           __database__ = 'test'
           __collection__ = "doca"
           authorized_types = Document.authorized_types+[str]
           structure = {'foo':float}

        doc = self.connection.DocA()
        doc['foo'] = 2
        failed = False
        try:
            doc.save()
        except SchemaTypeError, e:
            self.assertEqual(str(e), "foo must be an instance of float not int")
            failed = True
        self.assertEqual(failed, True)


        @self.connection.register
        class DocA(Document):
           __database__ = 'test'
           __collection__ = "doca"
           structure = {'foo':OR(int, float)}

        doc = self.connection.DocA()
        doc['foo'] = 3
        doc.save()
        doc['foo'] = 2.0
        doc.save()

        self.assertEqual(self.connection.test.doca.find_one(), doc)

    def test_cursor_slicing(self):
        @self.connection.register
        class DocA(Document):
            structure = {'foo':int}

        for i in range(10):
            doc = self.col.DocA()
            doc['foo'] = i
            doc.save()

        self.assertEqual(isinstance(self.col.DocA.find()[0], DocA), True)
        self.assertEqual(isinstance(self.col.DocA.find()[3], DocA), True)
        self.assertEqual(isinstance(self.col.DocA.find()[3:], self.col.DocA.find().__class__), True)

    def test_unwrapped_cursor(self):
        self.assertEqual(self.col.count(), 0)

        doc_id = self.col.save({}, safe=True)
        self.assertEqual(self.col.count(), 1)

        try:
            self.col.find(_id=doc_id)[0]

        except TypeError:
            self.fail("Cursor.__getitem__ raised TypeError unexpectedly!")

    def test_pass_connection_arguments_to_cursor(self):
        class MyDoc(Document):
            structure = {
                "foo":int,
                "bar":{"bla":int},
            }
        con = Connection(read_preference=ReadPreference.SECONDARY_PREFERRED,
            secondary_acceptable_latency_ms=16)
        con.register([MyDoc])
        col = con['test']['mongokit']
        assert col.MyDoc.find()._Cursor__read_preference == ReadPreference.SECONDARY_PREFERRED
        assert col.MyDoc.find()._Cursor__secondary_acceptable_latency_ms == 16
        con.close()