#!/usr/bin/env python
# test_cursor.py - unit test for cursor attributes
#
# Copyright (C) 2010-2011 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import time
import pickle
import psycopg2
import psycopg2.extensions
from .testutils import (unittest, ConnectingTestCase, skip_before_postgres,
skip_if_no_namedtuple, skip_if_no_getrefcount, slow, skip_if_no_superuser,
skip_if_windows)
import psycopg2.extras
class CursorTests(ConnectingTestCase):
def test_close_idempotent(self):
cur = self.conn.cursor()
cur.close()
cur.close()
self.assertTrue(cur.closed)
def test_empty_query(self):
cur = self.conn.cursor()
self.assertRaises(psycopg2.ProgrammingError, cur.execute, "")
self.assertRaises(psycopg2.ProgrammingError, cur.execute, " ")
self.assertRaises(psycopg2.ProgrammingError, cur.execute, ";")
def test_executemany_propagate_exceptions(self):
conn = self.conn
cur = conn.cursor()
cur.execute("create temp table test_exc (data int);")
def buggygen():
yield 1 // 0
self.assertRaises(ZeroDivisionError,
cur.executemany, "insert into test_exc values (%s)", buggygen())
cur.close()
def test_mogrify_unicode(self):
conn = self.conn
cur = conn.cursor()
# test consistency between execute and mogrify.
# unicode query containing only ascii data
cur.execute("SELECT 'foo';")
self.assertEqual('foo', cur.fetchone()[0])
self.assertEqual(b"SELECT 'foo';", cur.mogrify("SELECT 'foo';"))
conn.set_client_encoding('UTF8')
snowman = "\u2603"
def b(s):
if isinstance(s, str):
return s.encode('utf8')
else:
return s
# unicode query with non-ascii data
cur.execute("SELECT '%s';" % snowman)
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0]))
self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify("SELECT '%s';" % snowman))
# unicode args
cur.execute("SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)))
# unicode query and args
cur.execute("SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)))
def test_mogrify_decimal_explodes(self):
# issue #7: explodes on windows with python 2.5 and psycopg 2.2.2
try:
from decimal import Decimal
except:
return
conn = self.conn
cur = conn.cursor()
self.assertEqual(b'SELECT 10.3;',
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
@skip_if_no_getrefcount
def test_mogrify_leak_on_multiple_reference(self):
# issue #81: reference leak when a parameter value is referenced
# more than once from a dict.
cur = self.conn.cursor()
foo = (lambda x: x)('foo') * 10
import sys
nref1 = sys.getrefcount(foo)
cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
nref2 = sys.getrefcount(foo)
self.assertEqual(nref1, nref2)
def test_modify_closed(self):
cur = self.conn.cursor()
cur.close()
sql = cur.mogrify("select %s", (10,))
self.assertEqual(sql, b"select 10")
def test_bad_placeholder(self):
cur = self.conn.cursor()
self.assertRaises(psycopg2.ProgrammingError,
cur.mogrify, "select %(foo", {})
self.assertRaises(psycopg2.ProgrammingError,
cur.mogrify, "select %(foo", {'foo': 1})
self.assertRaises(psycopg2.ProgrammingError,
cur.mogrify, "select %(foo, %(bar)", {'foo': 1})
self.assertRaises(psycopg2.ProgrammingError,
cur.mogrify, "select %(foo, %(bar)", {'foo': 1, 'bar': 2})
def test_cast(self):
curs = self.conn.cursor()
self.assertEqual(42, curs.cast(20, '42'))
self.assertAlmostEqual(3.14, curs.cast(700, '3.14'))
try:
from decimal import Decimal
except ImportError:
self.assertAlmostEqual(123.45, curs.cast(1700, '123.45'))
else:
self.assertEqual(Decimal('123.45'), curs.cast(1700, '123.45'))
from datetime import date
self.assertEqual(date(2011, 1, 2), curs.cast(1082, '2011-01-02'))
self.assertEqual("who am i?", curs.cast(705, 'who am i?')) # unknown
def test_cast_specificity(self):
curs = self.conn.cursor()
self.assertEqual("foo", curs.cast(705, 'foo'))
D = psycopg2.extensions.new_type((705,), "DOUBLING", lambda v, c: v * 2)
psycopg2.extensions.register_type(D, self.conn)
self.assertEqual("foofoo", curs.cast(705, 'foo'))
T = psycopg2.extensions.new_type((705,), "TREBLING", lambda v, c: v * 3)
psycopg2.extensions.register_type(T, curs)
self.assertEqual("foofoofoo", curs.cast(705, 'foo'))
curs2 = self.conn.cursor()
self.assertEqual("foofoo", curs2.cast(705, 'foo'))
def test_weakref(self):
from weakref import ref
curs = self.conn.cursor()
w = ref(curs)
del curs
import gc
gc.collect()
self.assertTrue(w() is None)
def test_null_name(self):
curs = self.conn.cursor(None)
self.assertEqual(curs.name, None)
def test_invalid_name(self):
curs = self.conn.cursor()
curs.execute("create temp table invname (data int);")
for i in (10, 20, 30):
curs.execute("insert into invname values (%s)", (i,))
curs.close()
curs = self.conn.cursor(r'1-2-3 \ "test"')
curs.execute("select data from invname order by data")
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
def _create_withhold_table(self):
curs = self.conn.cursor()
try:
curs.execute("drop table withhold")
except psycopg2.ProgrammingError:
self.conn.rollback()
curs.execute("create table withhold (data int)")
for i in (10, 20, 30):
curs.execute("insert into withhold values (%s)", (i,))
curs.close()
def test_withhold(self):
self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
withhold=True)
self._create_withhold_table()
curs = self.conn.cursor("W")
self.assertEqual(curs.withhold, False)
curs.withhold = True
self.assertEqual(curs.withhold, True)
curs.execute("select data from withhold order by data")
self.conn.commit()
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
curs.close()
curs = self.conn.cursor("W", withhold=True)
self.assertEqual(curs.withhold, True)
curs.execute("select data from withhold order by data")
self.conn.commit()
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
curs = self.conn.cursor()
curs.execute("drop table withhold")
self.conn.commit()
def test_withhold_no_begin(self):
self._create_withhold_table()
curs = self.conn.cursor("w", withhold=True)
curs.execute("select data from withhold order by data")
self.assertEqual(curs.fetchone(), (10,))
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
self.conn.commit()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
self.assertEqual(curs.fetchone(), (20,))
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
curs.close()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_withhold_autocommit(self):
self._create_withhold_table()
self.conn.commit()
self.conn.autocommit = True
curs = self.conn.cursor("w", withhold=True)
curs.execute("select data from withhold order by data")
self.assertEqual(curs.fetchone(), (10,))
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
self.conn.commit()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
curs.close()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_scrollable(self):
self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
scrollable=True)
curs = self.conn.cursor()
curs.execute("create table scrollable (data int)")
curs.executemany("insert into scrollable values (%s)",
[(i,) for i in range(100)])
curs.close()
for t in range(2):
if not t:
curs = self.conn.cursor("S")
self.assertEqual(curs.scrollable, None)
curs.scrollable = True
else:
curs = self.conn.cursor("S", scrollable=True)
self.assertEqual(curs.scrollable, True)
curs.itersize = 10
# complex enough to make postgres cursors declare without
# scroll/no scroll to fail
curs.execute("""
select x.data
from scrollable x
join scrollable y on x.data = y.data
order by y.data""")
for i, (n,) in enumerate(curs):
self.assertEqual(i, n)
curs.scroll(-1)
for i in range(99, -1, -1):
curs.scroll(-1)
self.assertEqual(i, curs.fetchone()[0])
curs.scroll(-1)
curs.close()
def test_not_scrollable(self):
self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
scrollable=False)
curs = self.conn.cursor()
curs.execute("create table scrollable (data int)")
curs.executemany("insert into scrollable values (%s)",
[(i,) for i in range(100)])
curs.close()
curs = self.conn.cursor("S") # default scrollability
curs.execute("select * from scrollable")
self.assertEqual(curs.scrollable, None)
curs.scroll(2)
try:
curs.scroll(-1)
except psycopg2.OperationalError:
return self.skipTest("can't evaluate non-scrollable cursor")
curs.close()
curs = self.conn.cursor("S", scrollable=False)
self.assertEqual(curs.scrollable, False)
curs.execute("select * from scrollable")
curs.scroll(2)
self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
@slow
@skip_before_postgres(8, 2)
def test_iter_named_cursor_efficient(self):
Loading ...