Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cql / test / test_native_connection.py
Size: Mime:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# to configure behavior, define $CQL_TEST_HOST to the destination address
# for native connections, and $CQL_TEST_NATIVE_PORT to the associated port.

import os
import unittest
import contextlib
from thrift.transport import TTransport
import test_cql
from test_prepared_queries import MIN_THRIFT_FOR_CQL_3_0_0_FINAL
from test_connection import with_keyspace, TEST_HOST, randstring, cql

TEST_NATIVE_PORT = int(os.environ.get('CQL_TEST_NATIVE_PORT', '8000'))

class TestNativeConnection(unittest.TestCase):
    def setUp(self):
        self.randstr = randstring()
        self.with_keyspace = lambda curs, ver: with_keyspace(self.randstr, curs, ver)

    def test_connecting_with_cql_version(self):
        # 2.0.0 won't be supported by binary protocol
        self.assertRaises(cql.ProgrammingError,
                          cql.connect, TEST_HOST, TEST_NATIVE_PORT,
                          native=True, cql_version='2.0.0')

    def test_connecting_with_keyspace(self):
        # this conn is just for creating the keyspace
        conn = cql.connect(TEST_HOST, TEST_NATIVE_PORT, native=True)
        curs = conn.cursor()
        with self.with_keyspace(curs, conn.cql_version) as ksname:
            curs.execute('create table blah1_%s (a int primary key, b int);' % self.randstr)
            conn2 = cql.connect(TEST_HOST, TEST_NATIVE_PORT, keyspace=ksname,
                                native=True, cql_version=conn.cql_version)
            curs2 = conn2.cursor()
            curs2.execute('select * from blah1_%s;' % self.randstr)
            conn2.close()

    def test_execution_fails_after_close(self):
        conn = cql.connect(TEST_HOST, TEST_NATIVE_PORT, native=True)
        curs = conn.cursor()
        with self.with_keyspace(curs, conn.cql_version) as ksname:
            curs.execute('create table blah (a int primary key, b int);')
            curs.execute('select * from blah;')
        conn.close()
        self.assertRaises(cql.ProgrammingError, curs.execute, 'select * from blah;')

    def try_basic_stuff(self, conn):
        curs = conn.cursor()
        with self.with_keyspace(curs, conn.cql_version) as ksname:
            curs.execute('create table moo (a text primary key, b int, c float);')
            curs.execute("insert into moo (a, b, c) values (:d, :e, :f);",
                         {'d': 'hi', 'e': 1234, 'f': 1.234});
            qprep = curs.prepare_query("select * from moo where a = :fish;")
            curs.execute_prepared(qprep, {'fish': 'hi'})
            res = curs.fetchall()
            self.assertEqual(len(res), 1)
            self.assertEqual(res[0][0], 'hi')
            self.assertEqual(res[0][1], 1234)
            self.assertAlmostEqual(res[0][2], 1.234)

    def test_connecting_without_compression(self):
        conn = cql.connect(TEST_HOST, TEST_NATIVE_PORT, native=True, compression=False)
        self.assertEqual(conn.compressor, None)
        self.try_basic_stuff(conn)

    def test_connecting_with_compression(self):
        try:
            import snappy
        except ImportError:
            if hasattr(unittest, 'skipTest'):
                unittest.skipTest('Snappy compression not available')
            else:
                return
        conn = cql.connect(TEST_HOST, TEST_NATIVE_PORT, native=True, compression=True)
        self.assertEqual(conn.compressor, snappy.compress)
        self.try_basic_stuff(conn)