Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pymongo / test / performance / perf_test.py
Size: Mime:
# Copyright 2015 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the MongoDB Driver Performance Benchmarking Spec."""

import multiprocessing as mp
import os
import sys
import tempfile
import warnings

try:
    import simplejson as json
except ImportError:
    import json

sys.path[0:0] = [""]

from bson import decode, encode
from bson.json_util import loads
from gridfs import GridFSBucket
from pymongo import MongoClient
from pymongo.monotonic import time
from test import client_context, host, port, unittest

NUM_ITERATIONS = 100
MAX_ITERATION_TIME = 300
NUM_DOCS = 10000

TEST_PATH = os.environ.get('TEST_PATH', os.path.join(
    os.path.dirname(os.path.realpath(__file__)),
    os.path.join('data')))

OUTPUT_FILE = os.environ.get('OUTPUT_FILE')

result_data = []

def tearDownModule():
    output = json.dumps({
        'results': result_data
        }, indent=4)
    if OUTPUT_FILE:
        with open(OUTPUT_FILE, 'w') as opf:
            opf.write(output)
    else:
        print(output)


class Timer(object):
    def __enter__(self):
        self.start = time()
        return self

    def __exit__(self, *args):
        self.end = time()
        self.interval = self.end - self.start


class PerformanceTest(object):

    @classmethod
    def setUpClass(cls):
        client_context.init()

    def setUp(self):
        pass

    def tearDown(self):
        name = self.__class__.__name__
        median = self.percentile(50)
        result = self.data_size / median
        print('Running %s. MEDIAN=%s' % (self.__class__.__name__,
                                         self.percentile(50)))
        result_data.append({
            'name': name,
            'results': {
                '1': {
                    'ops_per_sec': result
                }
            }
        })

    def before(self):
        pass

    def after(self):
        pass

    def percentile(self, percentile):
        if hasattr(self, 'results'):
            sorted_results = sorted(self.results)
            percentile_index = int(len(sorted_results) * percentile / 100) - 1
            return sorted_results[percentile_index]
        else:
            self.fail('Test execution failed')

    def runTest(self):
        results = []
        start = time()
        self.max_iterations = NUM_ITERATIONS
        for i in range(NUM_ITERATIONS):
            if time() - start > MAX_ITERATION_TIME:
                warnings.warn('Test timed out, completed %s iterations.' % i)
                break
            self.before()
            with Timer() as timer:
                self.do_task()
            self.after()
            results.append(timer.interval)

        self.results = results


# BSON MICRO-BENCHMARKS
class BsonEncodingTest(PerformanceTest):
    def setUp(self):
        # Location of test data.
        with open(
            os.path.join(TEST_PATH,
                         os.path.join('extended_bson', self.dataset))) as data:
            self.document = loads(data.read())

    def do_task(self):
        for _ in range(NUM_DOCS):
            encode(self.document)


class BsonDecodingTest(PerformanceTest):
    def setUp(self):
        # Location of test data.
        with open(
            os.path.join(TEST_PATH,
                         os.path.join('extended_bson', self.dataset))) as data:
            self.document = encode(json.loads(data.read()))

    def do_task(self):
        for _ in range(NUM_DOCS):
            decode(self.document)


class TestFlatEncoding(BsonEncodingTest, unittest.TestCase):
    dataset = 'flat_bson.json'
    data_size = 75310000


class TestFlatDecoding(BsonDecodingTest, unittest.TestCase):
    dataset = 'flat_bson.json'
    data_size = 75310000


class TestDeepEncoding(BsonEncodingTest, unittest.TestCase):
    dataset = 'deep_bson.json'
    data_size = 19640000


class TestDeepDecoding(BsonDecodingTest, unittest.TestCase):
    dataset = 'deep_bson.json'
    data_size = 19640000


class TestFullEncoding(BsonEncodingTest, unittest.TestCase):
    dataset = 'full_bson.json'
    data_size = 57340000


class TestFullDecoding(BsonDecodingTest, unittest.TestCase):
    dataset = 'full_bson.json'
    data_size = 57340000


# SINGLE-DOC BENCHMARKS
class TestRunCommand(PerformanceTest, unittest.TestCase):
    data_size = 160000
    def setUp(self):
        self.client = client_context.client
        self.client.drop_database('perftest')

    def do_task(self):
        command = self.client.perftest.command
        for _ in range(NUM_DOCS):
            command("ping")


class TestDocument(PerformanceTest):
    def setUp(self):
        # Location of test data.
        with open(
            os.path.join(
                TEST_PATH, os.path.join(
                    'single_and_multi_document', self.dataset)), 'r') as data:
            self.document = json.loads(data.read())

        self.client = client_context.client
        self.client.drop_database('perftest')

    def tearDown(self):
        super(TestDocument, self).tearDown()
        self.client.drop_database('perftest')

    def before(self):
        self.corpus = self.client.perftest.create_collection('corpus')

    def after(self):
        self.client.perftest.drop_collection('corpus')


class TestFindOneByID(TestDocument, unittest.TestCase):
    data_size = 16220000
    def setUp(self):
        self.dataset = 'tweet.json'
        super(TestFindOneByID, self).setUp()

        documents = [self.document.copy() for _ in range(NUM_DOCS)]
        self.corpus = self.client.perftest.corpus
        result = self.corpus.insert_many(documents)
        self.inserted_ids = result.inserted_ids

    def do_task(self):
        find_one = self.corpus.find_one
        for _id in self.inserted_ids:
            find_one({'_id': _id})

    def before(self):
        pass

    def after(self):
        pass


class TestSmallDocInsertOne(TestDocument, unittest.TestCase):
    data_size = 2750000
    def setUp(self):
        self.dataset = 'small_doc.json'
        super(TestSmallDocInsertOne, self).setUp()

        self.documents = [self.document.copy() for _ in range(NUM_DOCS)]

    def do_task(self):
        insert_one = self.corpus.insert_one
        for doc in self.documents:
            insert_one(doc)


class TestLargeDocInsertOne(TestDocument, unittest.TestCase):
    data_size = 27310890
    def setUp(self):
        self.dataset = 'large_doc.json'
        super(TestLargeDocInsertOne, self).setUp()

        self.documents = [self.document.copy() for _ in range(10)]

    def do_task(self):
        insert_one = self.corpus.insert_one
        for doc in self.documents:
            insert_one(doc)


# MULTI-DOC BENCHMARKS
class TestFindManyAndEmptyCursor(TestDocument, unittest.TestCase):
    data_size = 16220000
    def setUp(self):
        self.dataset = 'tweet.json'
        super(TestFindManyAndEmptyCursor, self).setUp()

        for _ in range(10):
            self.client.perftest.command(
                'insert', 'corpus',
                documents=[self.document] * 1000)
        self.corpus = self.client.perftest.corpus

    def do_task(self):
        list(self.corpus.find())

    def before(self):
        pass

    def after(self):
        pass


class TestSmallDocBulkInsert(TestDocument, unittest.TestCase):
    data_size = 2750000
    def setUp(self):
        self.dataset = 'small_doc.json'
        super(TestSmallDocBulkInsert, self).setUp()
        self.documents = [self.document.copy() for _ in range(NUM_DOCS)]

    def before(self):
        self.corpus = self.client.perftest.create_collection('corpus')

    def do_task(self):
        self.corpus.insert_many(self.documents, ordered=True)


class TestLargeDocBulkInsert(TestDocument, unittest.TestCase):
    data_size = 27310890
    def setUp(self):
        self.dataset = 'large_doc.json'
        super(TestLargeDocBulkInsert, self).setUp()
        self.documents = [self.document.copy() for _ in range(10)]

    def before(self):
        self.corpus = self.client.perftest.create_collection('corpus')

    def do_task(self):
        self.corpus.insert_many(self.documents, ordered=True)


class TestGridFsUpload(PerformanceTest, unittest.TestCase):
    data_size = 52428800
    def setUp(self):
        self.client = client_context.client
        self.client.drop_database('perftest')

        gridfs_path = os.path.join(
            TEST_PATH,
            os.path.join('single_and_multi_document', 'gridfs_large.bin'))
        with open(gridfs_path, 'rb') as data:
            self.document = data.read()

        self.bucket = GridFSBucket(self.client.perftest)

    def tearDown(self):
        super(TestGridFsUpload, self).tearDown()
        self.client.drop_database('perftest')

    def before(self):
        self.bucket.upload_from_stream('init', b'x')

    def do_task(self):
        self.bucket.upload_from_stream('gridfstest', self.document)


class TestGridFsDownload(PerformanceTest, unittest.TestCase):
    data_size = 52428800
    def setUp(self):
        self.client = client_context.client
        self.client.drop_database('perftest')

        gridfs_path = os.path.join(
            TEST_PATH,
            os.path.join('single_and_multi_document', 'gridfs_large.bin'))

        self.bucket = GridFSBucket(self.client.perftest)
        with open(gridfs_path, 'rb') as gfile:
            self.uploaded_id = self.bucket.upload_from_stream(
                'gridfstest', gfile)

    def tearDown(self):
        super(TestGridFsDownload, self).tearDown()
        self.client.drop_database('perftest')

    def do_task(self):
        self.bucket.open_download_stream(self.uploaded_id).read()


proc_client = None


def proc_init(*dummy):
    global proc_client
    proc_client = MongoClient(host, port)


# PARALLEL BENCHMARKS
def mp_map(map_func, files):
    pool = mp.Pool(initializer=proc_init)
    pool.map(map_func, files)
    pool.close()


def insert_json_file(filename):
    with open(filename, 'r') as data:
        coll = proc_client.perftest.corpus
        coll.insert_many([json.loads(line) for line in data])


def insert_json_file_with_file_id(filename):
    documents = []
    with open(filename, 'r') as data:
        for line in data:
            doc = json.loads(line)
            doc['file'] = filename
            documents.append(doc)
    coll = proc_client.perftest.corpus
    coll.insert_many(documents)


def read_json_file(filename):
    coll = proc_client.perftest.corpus
    temp = tempfile.TemporaryFile()
    try:
        temp.writelines(
            [json.dumps(doc) + '\n' for
             doc in coll.find({'file': filename}, {'_id': False})])
    finally:
        temp.close()


def insert_gridfs_file(filename):
    bucket = GridFSBucket(proc_client.perftest)

    with open(filename, 'rb') as gfile:
        bucket.upload_from_stream(filename, gfile)


def read_gridfs_file(filename):
    bucket = GridFSBucket(proc_client.perftest)

    temp = tempfile.TemporaryFile()
    try:
        bucket.download_to_stream_by_name(filename, temp)
    finally:
        temp.close()


class TestJsonMultiImport(PerformanceTest, unittest.TestCase):
    data_size = 565000000
    def setUp(self):
        self.client = client_context.client
        self.client.drop_database('perftest')

    def before(self):
        self.client.perftest.command({'create': 'corpus'})
        self.corpus = self.client.perftest.corpus

        ldjson_path = os.path.join(
            TEST_PATH, os.path.join('parallel', 'ldjson_multi'))
        self.files = [os.path.join(
            ldjson_path, s) for s in os.listdir(ldjson_path)]

    def do_task(self):
        mp_map(insert_json_file, self.files)

    def after(self):
        self.client.perftest.drop_collection('corpus')

    def tearDown(self):
        super(TestJsonMultiImport, self).tearDown()
        self.client.drop_database('perftest')


class TestJsonMultiExport(PerformanceTest, unittest.TestCase):
    data_size = 565000000
    def setUp(self):
        self.client = client_context.client
        self.client.drop_database('perftest')
        self.client.perfest.corpus.create_index('file')

        ldjson_path = os.path.join(
            TEST_PATH, os.path.join('parallel', 'ldjson_multi'))
        self.files = [os.path.join(
            ldjson_path, s) for s in os.listdir(ldjson_path)]

        mp_map(insert_json_file_with_file_id, self.files)

    def do_task(self):
        mp_map(read_json_file, self.files)

    def tearDown(self):
        super(TestJsonMultiExport, self).tearDown()
        self.client.drop_database('perftest')


class TestGridFsMultiFileUpload(PerformanceTest, unittest.TestCase):
    data_size = 262144000
    def setUp(self):
        self.client = client_context.client
        self.client.drop_database('perftest')

    def before(self):
        self.client.perftest.drop_collection('fs.files')
        self.client.perftest.drop_collection('fs.chunks')

        self.bucket = GridFSBucket(self.client.perftest)
        gridfs_path = os.path.join(
            TEST_PATH, os.path.join('parallel', 'gridfs_multi'))
        self.files = [os.path.join(
            gridfs_path, s) for s in os.listdir(gridfs_path)]

    def do_task(self):
        mp_map(insert_gridfs_file, self.files)

    def tearDown(self):
        super(TestGridFsMultiFileUpload, self).tearDown()
        self.client.drop_database('perftest')


class TestGridFsMultiFileDownload(PerformanceTest, unittest.TestCase):
    data_size = 262144000
    def setUp(self):
        self.client = client_context.client
        self.client.drop_database('perftest')

        bucket = GridFSBucket(self.client.perftest)

        gridfs_path = os.path.join(
            TEST_PATH, os.path.join('parallel', 'gridfs_multi'))
        self.files = [os.path.join(
            gridfs_path, s) for s in os.listdir(gridfs_path)]

        for fname in self.files:
            with open(fname, 'rb') as gfile:
                bucket.upload_from_stream(fname, gfile)

    def do_task(self):
        mp_map(read_gridfs_file, self.files)

    def tearDown(self):
        super(TestGridFsMultiFileDownload, self).tearDown()
        self.client.drop_database('perftest')


if __name__ == "__main__":
    unittest.main()