Repository URL to install this package:
|
Version:
3.12.2 ▾
|
# Copyright 2015-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import sys
import time
import warnings
sys.path[0:0] = [""]
from bson.int64 import Int64
from bson.objectid import ObjectId
from bson.py3compat import text_type
from bson.son import SON
from pymongo import CursorType, monitoring, InsertOne, UpdateOne, DeleteOne
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (AutoReconnect,
NotPrimaryError,
OperationFailure)
from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from test import (client_context,
client_knobs,
IntegrationTest,
sanitize_cmd,
unittest)
from test.utils import (EventListener,
get_pool,
ignore_deprecations,
rs_or_single_client,
single_client,
wait_until)
class TestCommandMonitoring(IntegrationTest):
@classmethod
@client_context.require_connection
def setUpClass(cls):
super(TestCommandMonitoring, cls).setUpClass()
cls.listener = EventListener()
cls.client = rs_or_single_client(
event_listeners=[cls.listener],
retryWrites=False)
@classmethod
def tearDownClass(cls):
cls.client.close()
super(TestCommandMonitoring, cls).tearDownClass()
def tearDown(self):
self.listener.results.clear()
super(TestCommandMonitoring, self).tearDown()
def test_started_simple(self):
self.client.pymongo_test.command('ping')
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(SON([('ping', 1)]), started.command)
self.assertEqual('ping', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
def test_succeeded_simple(self):
self.client.pymongo_test.command('ping')
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertEqual('ping', succeeded.command_name)
self.assertEqual(self.client.address, succeeded.connection_id)
self.assertEqual(1, succeeded.reply.get('ok'))
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertTrue(isinstance(succeeded.duration_micros, int))
def test_failed_simple(self):
try:
self.client.pymongo_test.command('oops!')
except OperationFailure:
pass
results = self.listener.results
started = results['started'][0]
failed = results['failed'][0]
self.assertEqual(0, len(results['succeeded']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(
isinstance(failed, monitoring.CommandFailedEvent))
self.assertEqual('oops!', failed.command_name)
self.assertEqual(self.client.address, failed.connection_id)
self.assertEqual(0, failed.failure.get('ok'))
self.assertTrue(isinstance(failed.request_id, int))
self.assertTrue(isinstance(failed.duration_micros, int))
def test_find_one(self):
self.client.pymongo_test.test.find_one()
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([('find', 'test'),
('filter', {}),
('limit', 1),
('singleBatch', True)]),
started.command)
self.assertEqual('find', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
def test_find_and_get_more(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
self.listener.results.clear()
cursor = self.client.pymongo_test.test.find(
projection={'_id': False},
batch_size=4)
for _ in range(4):
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([('find', 'test'),
('filter', {}),
('projection', {'_id': False}),
('batchSize', 4)]),
started.command)
self.assertEqual('find', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('find', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(cursor.address, succeeded.connection_id)
csr = succeeded.reply["cursor"]
self.assertEqual(csr["id"], cursor_id)
self.assertEqual(csr["ns"], "pymongo_test.test")
self.assertEqual(csr["firstBatch"], [{} for _ in range(4)])
self.listener.results.clear()
# Next batch. Exhausting the cursor could cause a getMore
# that returns id of 0 and no results.
next(cursor)
try:
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([('getMore', cursor_id),
('collection', 'test'),
('batchSize', 4)]),
started.command)
self.assertEqual('getMore', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('getMore', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(cursor.address, succeeded.connection_id)
csr = succeeded.reply["cursor"]
self.assertEqual(csr["id"], cursor_id)
self.assertEqual(csr["ns"], "pymongo_test.test")
self.assertEqual(csr["nextBatch"], [{} for _ in range(4)])
finally:
# Exhaust the cursor to avoid kill cursors.
tuple(cursor)
def test_find_with_explain(self):
cmd = SON([('explain', SON([('find', 'test'),
('filter', {})]))])
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_one({})
self.listener.results.clear()
coll = self.client.pymongo_test.test
# Test that we publish the unwrapped command.
if self.client.is_mongos:
coll = coll.with_options(
read_preference=ReadPreference.PRIMARY_PREFERRED)
res = coll.find().explain()
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(cmd, started.command)
self.assertEqual('explain', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('explain', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(self.client.address, succeeded.connection_id)
self.assertEqual(res, succeeded.reply)
def _test_find_options(self, query, expected_cmd):
coll = self.client.pymongo_test.test
coll.drop()
coll.create_index('x')
coll.insert_many([{'x': i} for i in range(5)])
# Test that we publish the unwrapped command.
self.listener.results.clear()
if self.client.is_mongos:
coll = coll.with_options(
read_preference=ReadPreference.PRIMARY_PREFERRED)
cursor = coll.find(**query)
next(cursor)
try:
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(expected_cmd, started.command)
self.assertEqual('find', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('find', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(self.client.address, succeeded.connection_id)
finally:
# Exhaust the cursor to avoid kill cursors.
tuple(cursor)
def test_find_options(self):
query = dict(filter={},
hint=[('x', 1)],
max_time_ms=10000,
max={'x': 10},
min={'x': -10},
return_key=True,
show_record_id=True,
projection={'x': False},
skip=1,
no_cursor_timeout=True,
sort=[('_id', 1)],
allow_partial_results=True,
comment='this is a test',
batch_size=2)
cmd = dict(find='test',
filter={},
hint=SON([('x', 1)]),
comment='this is a test',
maxTimeMS=10000,
max={'x': 10},
min={'x': -10},
returnKey=True,
showRecordId=True,
sort=SON([('_id', 1)]),
projection={'x': False},
skip=1,
batchSize=2,
noCursorTimeout=True,
allowPartialResults=True)
if client_context.version < (4, 1, 0, -1):
query['max_scan'] = 10
cmd['maxScan'] = 10
self._test_find_options(query, cmd)
@client_context.require_version_max(3, 7, 2)
def test_find_snapshot(self):
# Test "snapshot" parameter separately, can't combine with "sort".
query = dict(filter={},
snapshot=True)
cmd = dict(find='test',
filter={},
snapshot=True)
self._test_find_options(query, cmd)
def test_command_and_get_more(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many(
[{'x': 1} for _ in range(10)])
self.listener.results.clear()
coll = self.client.pymongo_test.test
# Test that we publish the unwrapped command.
if self.client.is_mongos:
coll = coll.with_options(
read_preference=ReadPreference.PRIMARY_PREFERRED)
cursor = coll.aggregate(
[{'$project': {'_id': False, 'x': 1}}], batchSize=4)
for _ in range(4):
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([('aggregate', 'test'),
('pipeline', [{'$project': {'_id': False, 'x': 1}}]),
('cursor', {'batchSize': 4})]),
started.command)
self.assertEqual('aggregate', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('aggregate', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(cursor.address, succeeded.connection_id)
expected_cursor = {'id': cursor_id,
'ns': 'pymongo_test.test',
'firstBatch': [{'x': 1} for _ in range(4)]}
self.assertEqualCommand(expected_cursor, succeeded.reply.get('cursor'))
self.listener.results.clear()
next(cursor)
try:
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([('getMore', cursor_id),
('collection', 'test'),
('batchSize', 4)]),
started.command)
self.assertEqual('getMore', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('getMore', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(cursor.address, succeeded.connection_id)
expected_result = {
'cursor': {'id': cursor_id,
'ns': 'pymongo_test.test',
'nextBatch': [{'x': 1} for _ in range(4)]},
'ok': 1.0}
self.assertEqualReply(expected_result, succeeded.reply)
finally:
# Exhaust the cursor to avoid kill cursors.
tuple(cursor)
def test_get_more_failure(self):
address = self.client.address
coll = self.client.pymongo_test.test
cursor_id = Int64(12345)
cursor_doc = {"id": cursor_id, "firstBatch": [], "ns": coll.full_name}
cursor = CommandCursor(coll, cursor_doc, address)
try:
next(cursor)
except Exception:
pass
results = self.listener.results
started = results['started'][0]
self.assertEqual(0, len(results['succeeded']))
failed = results['failed'][0]
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(
SON([('getMore', cursor_id),
('collection', 'test')]),
started.command)
self.assertEqual('getMore', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(failed, monitoring.CommandFailedEvent))
self.assertTrue(isinstance(failed.duration_micros, int))
self.assertEqual('getMore', failed.command_name)
self.assertTrue(isinstance(failed.request_id, int))
self.assertEqual(cursor.address, failed.connection_id)
self.assertEqual(0, failed.failure.get("ok"))
@client_context.require_replica_set
@client_context.require_secondaries_count(1)
def test_not_primary_error(self):
address = next(iter(client_context.client.secondaries))
client = single_client(*address, event_listeners=[self.listener])
# Clear authentication command results from the listener.
client.admin.command('ping')
self.listener.results.clear()
error = None
try:
client.pymongo_test.test.find_one_and_delete({})
except NotPrimaryError as exc:
error = exc.errors
results = self.listener.results
started = results['started'][0]
failed = results['failed'][0]
self.assertEqual(0, len(results['succeeded']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertTrue(
isinstance(failed, monitoring.CommandFailedEvent))
self.assertEqual('findAndModify', failed.command_name)
self.assertEqual(address, failed.connection_id)
self.assertEqual(0, failed.failure.get('ok'))
self.assertTrue(isinstance(failed.request_id, int))
self.assertTrue(isinstance(failed.duration_micros, int))
self.assertEqual(error, failed.failure)
@client_context.require_no_mongos
def test_exhaust(self):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{} for _ in range(11)])
self.listener.results.clear()
cursor = self.client.pymongo_test.test.find(
projection={'_id': False},
batch_size=5,
cursor_type=CursorType.EXHAUST)
next(cursor)
cursor_id = cursor.cursor_id
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(SON([('find', 'test'),
('filter', {}),
('projection', {'_id': False}),
('batchSize', 5)]), started.command)
self.assertEqual('find', started.command_name)
self.assertEqual(cursor.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('find', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertEqual(cursor.address, succeeded.connection_id)
expected_result = {
'cursor': {'id': cursor_id,
'ns': 'pymongo_test.test',
'firstBatch': [{} for _ in range(5)]},
'ok': 1}
self.assertEqualReply(expected_result, succeeded.reply)
self.listener.results.clear()
tuple(cursor)
results = self.listener.results
self.assertEqual(0, len(results['failed']))
for event in results['started']:
self.assertTrue(isinstance(event, monitoring.CommandStartedEvent))
self.assertEqualCommand(SON([('getMore', cursor_id),
('collection', 'test'),
('batchSize', 5)]), event.command)
self.assertEqual('getMore', event.command_name)
self.assertEqual(cursor.address, event.connection_id)
self.assertEqual('pymongo_test', event.database_name)
self.assertTrue(isinstance(event.request_id, int))
for event in results['succeeded']:
self.assertTrue(
isinstance(event, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(event.duration_micros, int))
self.assertEqual('getMore', event.command_name)
self.assertTrue(isinstance(event.request_id, int))
self.assertEqual(cursor.address, event.connection_id)
# Last getMore receives a response with cursor id 0.
self.assertEqual(0, results['succeeded'][-1].reply['cursor']['id'])
def test_kill_cursors(self):
with client_knobs(kill_cursor_frequency=0.01):
self.client.pymongo_test.test.drop()
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
cursor = self.client.pymongo_test.test.find().batch_size(5)
next(cursor)
cursor_id = cursor.cursor_id
self.listener.results.clear()
cursor.close()
time.sleep(2)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
# There could be more than one cursor_id here depending on
# when the thread last ran.
self.assertIn(cursor_id, started.command['cursors'])
self.assertEqual('killCursors', started.command_name)
self.assertIs(type(started.connection_id), tuple)
self.assertEqual(cursor.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(isinstance(succeeded.duration_micros, int))
self.assertEqual('killCursors', succeeded.command_name)
self.assertTrue(isinstance(succeeded.request_id, int))
self.assertIs(type(succeeded.connection_id), tuple)
self.assertEqual(cursor.address, succeeded.connection_id)
# There could be more than one cursor_id here depending on
# when the thread last ran.
self.assertTrue(cursor_id in succeeded.reply['cursorsUnknown']
or cursor_id in succeeded.reply['cursorsKilled'])
def test_non_bulk_writes(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
# Implied write concern insert_one
res = coll.insert_one({'x': 1})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}])])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# Unacknowledged insert_one
self.listener.results.clear()
coll = coll.with_options(write_concern=WriteConcern(w=0))
res = coll.insert_one({'x': 1})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('writeConcern', {'w': 0})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
self.assertEqualReply(succeeded.reply, {'ok': 1})
# Explicit write concern insert_one
self.listener.results.clear()
coll = coll.with_options(write_concern=WriteConcern(w=1))
res = coll.insert_one({'x': 1})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': res.inserted_id, 'x': 1}]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# delete_many
self.listener.results.clear()
res = coll.delete_many({'x': 1})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 1}),
('limit', 0)])]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(res.deleted_count, reply.get('n'))
# replace_one
self.listener.results.clear()
oid = ObjectId()
res = coll.replace_one({'_id': oid}, {'_id': oid, 'x': 1}, upsert=True)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'_id': oid}),
('u', {'_id': oid, 'x': 1}),
('multi', False),
('upsert', True)])]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual([{'index': 0, '_id': oid}], reply.get('upserted'))
# update_one
self.listener.results.clear()
res = coll.update_one({'x': 1}, {'$inc': {'x': 1}})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 1}),
('u', {'$inc': {'x': 1}}),
('multi', False),
('upsert', False)])]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# update_many
self.listener.results.clear()
res = coll.update_many({'x': 2}, {'$inc': {'x': 1}})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 2}),
('u', {'$inc': {'x': 1}}),
('multi', True),
('upsert', False)])]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# delete_one
self.listener.results.clear()
res = coll.delete_one({'x': 3})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 3}),
('limit', 1)])]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(0, coll.count_documents({}))
# write errors
coll.insert_one({'_id': 1})
try:
self.listener.results.clear()
coll.insert_one({'_id': 1})
except OperationFailure:
pass
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': 1}]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(0, reply.get('n'))
errors = reply.get('writeErrors')
self.assertIsInstance(errors, list)
error = errors[0]
self.assertEqual(0, error.get('index'))
self.assertIsInstance(error.get('code'), int)
self.assertIsInstance(error.get('errmsg'), text_type)
def test_legacy_writes(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
# Implied write concern insert
_id = coll.insert({'x': 1})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': _id, 'x': 1}])])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# Unacknowledged insert
self.listener.results.clear()
_id = coll.insert({'x': 1}, w=0)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': _id, 'x': 1}]),
('writeConcern', {'w': 0})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
self.assertEqual(succeeded.reply, {'ok': 1})
# Explicit write concern insert
self.listener.results.clear()
_id = coll.insert({'x': 1}, w=1)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': _id, 'x': 1}]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('insert', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# remove all
self.listener.results.clear()
res = coll.remove({'x': 1}, w=1)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 1}),
('limit', 0)])]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(res['n'], reply.get('n'))
# upsert
self.listener.results.clear()
oid = ObjectId()
coll.update({'_id': oid}, {'_id': oid, 'x': 1}, upsert=True, w=1)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'_id': oid}),
('u', {'_id': oid, 'x': 1}),
('multi', False),
('upsert', True)])]),
('writeConcern', {'w': 1})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual([{'index': 0, '_id': oid}], reply.get('upserted'))
# update one
self.listener.results.clear()
coll.update({'x': 1}, {'$inc': {'x': 1}})
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 1}),
('u', {'$inc': {'x': 1}}),
('multi', False),
('upsert', False)])])])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# update many
self.listener.results.clear()
coll.update({'x': 2}, {'$inc': {'x': 1}}, multi=True)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'x': 2}),
('u', {'$inc': {'x': 1}}),
('multi', True),
('upsert', False)])])])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('update', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
# remove one
self.listener.results.clear()
coll.remove({'x': 3}, multi=False)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'x': 3}),
('limit', 1)])])])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('delete', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
reply = succeeded.reply
self.assertEqual(1, reply.get('ok'))
self.assertEqual(1, reply.get('n'))
self.assertEqual(0, coll.count_documents({}))
def test_insert_many(self):
# This always uses the bulk API.
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
big = 'x' * (1024 * 1024 * 4)
docs = [{'_id': i, 'big': big} for i in range(6)]
coll.insert_many(docs)
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
documents = []
count = 0
operation_id = started[0].operation_id
self.assertIsInstance(operation_id, int)
for start, succeed in zip(started, succeeded):
self.assertIsInstance(start, monitoring.CommandStartedEvent)
cmd = sanitize_cmd(start.command)
self.assertEqual(['insert', 'ordered', 'documents'],
list(cmd.keys()))
self.assertEqual(coll.name, cmd['insert'])
self.assertIs(True, cmd['ordered'])
documents.extend(cmd['documents'])
self.assertEqual('pymongo_test', start.database_name)
self.assertEqual('insert', start.command_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
reply = succeed.reply
self.assertEqual(1, reply.get('ok'))
count += reply.get('n', 0)
self.assertEqual(documents, docs)
self.assertEqual(6, count)
def test_legacy_insert_many(self):
# On legacy servers this uses bulk OP_INSERT.
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
# Force two batches on legacy servers.
big = 'x' * (1024 * 1024 * 12)
docs = [{'_id': i, 'big': big} for i in range(6)]
coll.insert(docs)
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
documents = []
count = 0
operation_id = started[0].operation_id
self.assertIsInstance(operation_id, int)
for start, succeed in zip(started, succeeded):
self.assertIsInstance(start, monitoring.CommandStartedEvent)
cmd = sanitize_cmd(start.command)
self.assertEqual(['insert', 'ordered', 'documents'],
list(cmd.keys()))
self.assertEqual(coll.name, cmd['insert'])
self.assertIs(True, cmd['ordered'])
documents.extend(cmd['documents'])
self.assertEqual('pymongo_test', start.database_name)
self.assertEqual('insert', start.command_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
reply = succeed.reply
self.assertEqual(1, reply.get('ok'))
count += reply.get('n', 0)
self.assertEqual(documents, docs)
self.assertEqual(6, count)
def test_bulk_write(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
coll.bulk_write([InsertOne({'_id': 1}),
UpdateOne({'_id': 1}, {'$set': {'x': 1}}),
DeleteOne({'_id': 1})])
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
operation_id = started[0].operation_id
pairs = list(zip(started, succeeded))
self.assertEqual(3, len(pairs))
for start, succeed in pairs:
self.assertIsInstance(start, monitoring.CommandStartedEvent)
self.assertEqual('pymongo_test', start.database_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
expected = SON([('insert', coll.name),
('ordered', True),
('documents', [{'_id': 1}])])
self.assertEqualCommand(expected, started[0].command)
expected = SON([('update', coll.name),
('ordered', True),
('updates', [SON([('q', {'_id': 1}),
('u', {'$set': {'x': 1}}),
('multi', False),
('upsert', False)])])])
self.assertEqualCommand(expected, started[1].command)
expected = SON([('delete', coll.name),
('ordered', True),
('deletes', [SON([('q', {'_id': 1}),
('limit', 1)])])])
self.assertEqualCommand(expected, started[2].command)
@client_context.require_failCommand_fail_point
def test_bulk_write_command_network_error(self):
coll = self.client.pymongo_test.test
self.listener.results.clear()
insert_network_error = {
'configureFailPoint': 'failCommand',
'mode': {'times': 1},
'data': {
'failCommands': ['insert'],
'closeConnection': True,
},
}
with self.fail_point(insert_network_error):
with self.assertRaises(AutoReconnect):
coll.bulk_write([InsertOne({'_id': 1})])
failed = self.listener.results['failed']
self.assertEqual(1, len(failed))
event = failed[0]
self.assertEqual(event.command_name, 'insert')
self.assertIsInstance(event.failure, dict)
self.assertEqual(event.failure['errtype'], 'AutoReconnect')
self.assertTrue(event.failure['errmsg'])
@client_context.require_failCommand_fail_point
def test_bulk_write_command_error(self):
coll = self.client.pymongo_test.test
self.listener.results.clear()
insert_command_error = {
'configureFailPoint': 'failCommand',
'mode': {'times': 1},
'data': {
'failCommands': ['insert'],
'closeConnection': False,
'errorCode': 10107, # NotPrimary
},
}
with self.fail_point(insert_command_error):
with self.assertRaises(NotPrimaryError):
coll.bulk_write([InsertOne({'_id': 1})])
failed = self.listener.results['failed']
self.assertEqual(1, len(failed))
event = failed[0]
self.assertEqual(event.command_name, 'insert')
self.assertIsInstance(event.failure, dict)
self.assertEqual(event.failure['code'], 10107)
self.assertTrue(event.failure['errmsg'])
@client_context.require_version_max(3, 4, 99)
def test_bulk_write_legacy_network_error(self):
self.listener.results.clear()
# Make the delete operation run on a closed connection.
self.client.admin.command('ping')
pool = get_pool(self.client)
sock_info = pool.sockets[0]
sock_info.sock.close()
# Test legacy unacknowledged write network error.
coll = self.client.pymongo_test.get_collection(
'test', write_concern=WriteConcern(w=0))
with self.assertRaises(AutoReconnect):
coll.bulk_write([InsertOne({'_id': 1})], ordered=False)
failed = self.listener.results['failed']
self.assertEqual(1, len(failed))
event = failed[0]
self.assertEqual(event.command_name, 'insert')
self.assertIsInstance(event.failure, dict)
self.assertEqual(event.failure['errtype'], 'AutoReconnect')
self.assertTrue(event.failure['errmsg'])
def test_write_errors(self):
coll = self.client.pymongo_test.test
coll.drop()
self.listener.results.clear()
try:
coll.bulk_write([InsertOne({'_id': 1}),
InsertOne({'_id': 1}),
InsertOne({'_id': 1}),
DeleteOne({'_id': 1})],
ordered=False)
except OperationFailure:
pass
results = self.listener.results
started = results['started']
succeeded = results['succeeded']
self.assertEqual(0, len(results['failed']))
operation_id = started[0].operation_id
pairs = list(zip(started, succeeded))
errors = []
for start, succeed in pairs:
self.assertIsInstance(start, monitoring.CommandStartedEvent)
self.assertEqual('pymongo_test', start.database_name)
self.assertIsInstance(start.request_id, int)
self.assertEqual(self.client.address, start.connection_id)
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeed.duration_micros, int)
self.assertEqual(start.command_name, succeed.command_name)
self.assertEqual(start.request_id, succeed.request_id)
self.assertEqual(start.connection_id, succeed.connection_id)
self.assertEqual(start.operation_id, operation_id)
self.assertEqual(succeed.operation_id, operation_id)
if 'writeErrors' in succeed.reply:
errors.extend(succeed.reply['writeErrors'])
self.assertEqual(2, len(errors))
fields = set(['index', 'code', 'errmsg'])
for error in errors:
self.assertTrue(fields.issubset(set(error)))
def test_first_batch_helper(self):
# Regardless of server version and use of helpers._first_batch
# this test should still pass.
self.listener.results.clear()
tuple(self.client.pymongo_test.test.list_indexes())
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('listIndexes', 'test'), ('cursor', {})])
self.assertEqualCommand(expected, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('listIndexes', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
self.assertTrue('cursor' in succeeded.reply)
self.assertTrue('ok' in succeeded.reply)
self.listener.results.clear()
self.client.pymongo_test.current_op(True)
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = SON([('currentOp', 1), ('$all', True)])
self.assertEqualCommand(expected, started.command)
self.assertEqual('admin', started.database_name)
self.assertEqual('currentOp', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
self.assertTrue('inprog' in succeeded.reply)
self.assertTrue('ok' in succeeded.reply)
if not client_context.is_mongos:
with ignore_deprecations():
self.client.fsync(lock=True)
self.listener.results.clear()
self.client.unlock()
# Wait for async unlock...
wait_until(
lambda: not self.client.is_locked, "unlock the database")
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
expected = {'fsyncUnlock': 1}
self.assertEqualCommand(expected, started.command)
self.assertEqual('admin', started.database_name)
self.assertEqual('fsyncUnlock', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertIsInstance(succeeded.duration_micros, int)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
self.assertTrue('info' in succeeded.reply)
self.assertTrue('ok' in succeeded.reply)
def test_sensitive_commands(self):
listeners = self.client._event_listeners
self.listener.results.clear()
cmd = SON([("getnonce", 1)])
listeners.publish_command_start(
cmd, "pymongo_test", 12345, self.client.address)
delta = datetime.timedelta(milliseconds=100)
listeners.publish_command_success(
delta, {'nonce': 'e474f4561c5eb40b', 'ok': 1.0},
"getnonce", 12345, self.client.address)
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertIsInstance(started, monitoring.CommandStartedEvent)
self.assertEqual({}, started.command)
self.assertEqual('pymongo_test', started.database_name)
self.assertEqual('getnonce', started.command_name)
self.assertIsInstance(started.request_id, int)
self.assertEqual(self.client.address, started.connection_id)
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
self.assertEqual(succeeded.duration_micros, 100000)
self.assertEqual(started.command_name, succeeded.command_name)
self.assertEqual(started.request_id, succeeded.request_id)
self.assertEqual(started.connection_id, succeeded.connection_id)
self.assertEqual({}, succeeded.reply)
class TestGlobalListener(IntegrationTest):
@classmethod
@client_context.require_connection
def setUpClass(cls):
super(TestGlobalListener, cls).setUpClass()
cls.listener = EventListener()
# We plan to call register(), which internally modifies _LISTENERS.
cls.saved_listeners = copy.deepcopy(monitoring._LISTENERS)
monitoring.register(cls.listener)
cls.client = single_client()
# Get one (authenticated) socket in the pool.
cls.client.pymongo_test.command('ping')
@classmethod
def tearDownClass(cls):
monitoring._LISTENERS = cls.saved_listeners
cls.client.close()
super(TestGlobalListener, cls).tearDownClass()
def setUp(self):
super(TestGlobalListener, self).setUp()
self.listener.results.clear()
def test_simple(self):
self.client.pymongo_test.command('ping')
results = self.listener.results
started = results['started'][0]
succeeded = results['succeeded'][0]
self.assertEqual(0, len(results['failed']))
self.assertTrue(
isinstance(succeeded, monitoring.CommandSucceededEvent))
self.assertTrue(
isinstance(started, monitoring.CommandStartedEvent))
self.assertEqualCommand(SON([('ping', 1)]), started.command)
self.assertEqual('ping', started.command_name)
self.assertEqual(self.client.address, started.connection_id)
self.assertEqual('pymongo_test', started.database_name)
self.assertTrue(isinstance(started.request_id, int))
class TestEventClasses(unittest.TestCase):
def test_command_event_repr(self):
request_id, connection_id, operation_id = 1, ('localhost', 27017), 2
event = monitoring.CommandStartedEvent(
{'ping': 1}, 'admin', request_id, connection_id, operation_id)
self.assertEqual(
repr(event),
"<CommandStartedEvent ('localhost', 27017) db: 'admin', "
"command: 'ping', operation_id: 2, service_id: None>")
delta = datetime.timedelta(milliseconds=100)
event = monitoring.CommandSucceededEvent(
delta, {'ok': 1}, 'ping', request_id, connection_id,
operation_id)
self.assertEqual(
repr(event),
"<CommandSucceededEvent ('localhost', 27017) "
"command: 'ping', operation_id: 2, duration_micros: 100000, "
"service_id: None>")
event = monitoring.CommandFailedEvent(
delta, {'ok': 0}, 'ping', request_id, connection_id,
operation_id)
self.assertEqual(
repr(event),
"<CommandFailedEvent ('localhost', 27017) "
"command: 'ping', operation_id: 2, duration_micros: 100000, "
"failure: {'ok': 0}, service_id: None>")
def test_server_heartbeat_event_repr(self):
connection_id = ('localhost', 27017)
event = monitoring.ServerHeartbeatStartedEvent(connection_id)
self.assertEqual(
repr(event),
"<ServerHeartbeatStartedEvent ('localhost', 27017)>")
delta = 0.1
event = monitoring.ServerHeartbeatSucceededEvent(
delta, {'ok': 1}, connection_id)
self.assertEqual(
repr(event),
"<ServerHeartbeatSucceededEvent ('localhost', 27017) "
"duration: 0.1, awaited: False, reply: {'ok': 1}>")
event = monitoring.ServerHeartbeatFailedEvent(
delta, 'ERROR', connection_id)
self.assertEqual(
repr(event),
"<ServerHeartbeatFailedEvent ('localhost', 27017) "
"duration: 0.1, awaited: False, reply: 'ERROR'>")
def test_server_event_repr(self):
server_address = ('localhost', 27017)
topology_id = ObjectId('000000000000000000000001')
event = monitoring.ServerOpeningEvent(server_address, topology_id)
self.assertEqual(
repr(event),
"<ServerOpeningEvent ('localhost', 27017) "
"topology_id: 000000000000000000000001>")
event = monitoring.ServerDescriptionChangedEvent(
'PREV', 'NEW', server_address, topology_id)
self.assertEqual(
repr(event),
"<ServerDescriptionChangedEvent ('localhost', 27017) "
"changed from: PREV, to: NEW>")
event = monitoring.ServerClosedEvent(server_address, topology_id)
self.assertEqual(
repr(event),
"<ServerClosedEvent ('localhost', 27017) "
"topology_id: 000000000000000000000001>")
def test_topology_event_repr(self):
topology_id = ObjectId('000000000000000000000001')
event = monitoring.TopologyOpenedEvent(topology_id)
self.assertEqual(
repr(event),
"<TopologyOpenedEvent topology_id: 000000000000000000000001>")
event = monitoring.TopologyDescriptionChangedEvent(
'PREV', 'NEW', topology_id)
self.assertEqual(
repr(event),
"<TopologyDescriptionChangedEvent "
"topology_id: 000000000000000000000001 "
"changed from: PREV, to: NEW>")
event = monitoring.TopologyClosedEvent(topology_id)
self.assertEqual(
repr(event),
"<TopologyClosedEvent topology_id: 000000000000000000000001>")
if __name__ == "__main__":
unittest.main()