Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / psycopg2   python

Repository URL to install this package:

Version: 2.7.4 

/ tests / test_lobject.py

#!/usr/bin/env python

# test_lobject.py - unit test for large objects support
#
# Copyright (C) 2008-2011 James Henstridge  <james@jamesh.id.au>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
# License for more details.

import os
import shutil
import tempfile
from functools import wraps

import psycopg2
import psycopg2.extensions
from .testutils import (unittest, decorate_all_tests, skip_if_tpc_disabled,
    ConnectingTestCase, skip_if_green, slow)


def skip_if_no_lo(f):
    @wraps(f)
    def skip_if_no_lo_(self):
        if self.conn.server_version < 80100:
            return self.skipTest("large objects only supported from PG 8.1")
        else:
            return f(self)

    return skip_if_no_lo_

skip_lo_if_green = skip_if_green("libpq doesn't support LO in async mode")


class LargeObjectTestCase(ConnectingTestCase):
    def setUp(self):
        ConnectingTestCase.setUp(self)
        self.lo_oid = None
        self.tmpdir = None

    def tearDown(self):
        if self.tmpdir:
            shutil.rmtree(self.tmpdir, ignore_errors=True)

        if self.conn.closed:
            return

        if self.lo_oid is not None:
            self.conn.rollback()
            try:
                lo = self.conn.lobject(self.lo_oid, "n")
            except psycopg2.OperationalError:
                pass
            else:
                lo.unlink()

        ConnectingTestCase.tearDown(self)


class LargeObjectTests(LargeObjectTestCase):
    def test_create(self):
        lo = self.conn.lobject()
        self.assertNotEqual(lo, None)
        self.assertEqual(lo.mode[0], "w")

    def test_connection_needed(self):
        self.assertRaises(TypeError,
            psycopg2.extensions.lobject, [])

    def test_open_non_existent(self):
        # By creating then removing a large object, we get an Oid that
        # should be unused.
        lo = self.conn.lobject()
        lo.unlink()
        self.assertRaises(psycopg2.OperationalError, self.conn.lobject, lo.oid)

    def test_open_existing(self):
        lo = self.conn.lobject()
        lo2 = self.conn.lobject(lo.oid)
        self.assertNotEqual(lo2, None)
        self.assertEqual(lo2.oid, lo.oid)
        self.assertEqual(lo2.mode[0], "r")

    def test_open_for_write(self):
        lo = self.conn.lobject()
        lo2 = self.conn.lobject(lo.oid, "w")
        self.assertEqual(lo2.mode[0], "w")
        lo2.write(b"some data")

    def test_open_mode_n(self):
        # Openning an object in mode "n" gives us a closed lobject.
        lo = self.conn.lobject()
        lo.close()

        lo2 = self.conn.lobject(lo.oid, "n")
        self.assertEqual(lo2.oid, lo.oid)
        self.assertEqual(lo2.closed, True)

    def test_mode_defaults(self):
        lo = self.conn.lobject()
        lo2 = self.conn.lobject(mode=None)
        lo3 = self.conn.lobject(mode="")
        self.assertEqual(lo.mode, lo2.mode)
        self.assertEqual(lo.mode, lo3.mode)

    def test_close_connection_gone(self):
        lo = self.conn.lobject()
        self.conn.close()
        lo.close()

    def test_create_with_oid(self):
        # Create and delete a large object to get an unused Oid.
        lo = self.conn.lobject()
        oid = lo.oid
        lo.unlink()

        lo = self.conn.lobject(0, "w", oid)
        self.assertEqual(lo.oid, oid)

    def test_create_with_existing_oid(self):
        lo = self.conn.lobject()
        lo.close()

        self.assertRaises(psycopg2.OperationalError,
                          self.conn.lobject, 0, "w", lo.oid)
        self.assertTrue(not self.conn.closed)

    def test_import(self):
        self.tmpdir = tempfile.mkdtemp()
        filename = os.path.join(self.tmpdir, "data.txt")
        fp = open(filename, "wb")
        fp.write(b"some data")
        fp.close()

        lo = self.conn.lobject(0, "r", 0, filename)
        self.assertEqual(lo.read(), "some data")

    def test_close(self):
        lo = self.conn.lobject()
        self.assertEqual(lo.closed, False)
        lo.close()
        self.assertEqual(lo.closed, True)

    def test_write(self):
        lo = self.conn.lobject()
        self.assertEqual(lo.write(b"some data"), len("some data"))

    def test_write_large(self):
        lo = self.conn.lobject()
        data = "data" * 1000000
        self.assertEqual(lo.write(data), len(data))

    def test_read(self):
        lo = self.conn.lobject()
        lo.write(b"some data")
        lo.close()

        lo = self.conn.lobject(lo.oid)
        x = lo.read(4)
        self.assertEqual(type(x), type(''))
        self.assertEqual(x, "some")
        self.assertEqual(lo.read(), " data")

    def test_read_binary(self):
        lo = self.conn.lobject()
        lo.write(b"some data")
        lo.close()

        lo = self.conn.lobject(lo.oid, "rb")
        x = lo.read(4)
        self.assertEqual(type(x), type(b''))
        self.assertEqual(x, b"some")
        self.assertEqual(lo.read(), b" data")

    def test_read_text(self):
        lo = self.conn.lobject()
        snowman = "\u2603"
        lo.write("some data " + snowman)
        lo.close()

        lo = self.conn.lobject(lo.oid, "rt")
        x = lo.read(4)
        self.assertEqual(type(x), type(''))
        self.assertEqual(x, "some")
        self.assertEqual(lo.read(), " data " + snowman)

    @slow
    def test_read_large(self):
        lo = self.conn.lobject()
        data = "data" * 1000000
        lo.write("some" + data)
        lo.close()

        lo = self.conn.lobject(lo.oid)
        self.assertEqual(lo.read(4), "some")
        data1 = lo.read()
        # avoid dumping megacraps in the console in case of error
        self.assertTrue(data == data1,
            "%r... != %r..." % (data[:100], data1[:100]))

    def test_seek_tell(self):
        lo = self.conn.lobject()
        length = lo.write(b"some data")
        self.assertEqual(lo.tell(), length)
        lo.close()
        lo = self.conn.lobject(lo.oid)

        self.assertEqual(lo.seek(5, 0), 5)
        self.assertEqual(lo.tell(), 5)
        self.assertEqual(lo.read(), "data")

        # SEEK_CUR: relative current location
        lo.seek(5)
        self.assertEqual(lo.seek(2, 1), 7)
        self.assertEqual(lo.tell(), 7)
        self.assertEqual(lo.read(), "ta")

        # SEEK_END: relative to end of file
        self.assertEqual(lo.seek(-2, 2), length - 2)
        self.assertEqual(lo.read(), "ta")

    def test_unlink(self):
        lo = self.conn.lobject()
        lo.unlink()

        # the object doesn't exist now, so we can't reopen it.
        self.assertRaises(psycopg2.OperationalError, self.conn.lobject, lo.oid)
        # And the object has been closed.
        self.assertEqual(lo.closed, True)

    def test_export(self):
        lo = self.conn.lobject()
        lo.write(b"some data")

        self.tmpdir = tempfile.mkdtemp()
        filename = os.path.join(self.tmpdir, "data.txt")
        lo.export(filename)
        self.assertTrue(os.path.exists(filename))
        f = open(filename, "rb")
        try:
            self.assertEqual(f.read(), b"some data")
        finally:
            f.close()

    def test_close_twice(self):
        lo = self.conn.lobject()
        lo.close()
        lo.close()

    def test_write_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.write, b"some data")

    def test_read_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.read, 5)

    def test_seek_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.seek, 0)

    def test_tell_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        self.assertRaises(psycopg2.InterfaceError, lo.tell)

    def test_unlink_after_close(self):
        lo = self.conn.lobject()
        lo.close()
        # Unlink works on closed files.
        lo.unlink()

    def test_export_after_close(self):
        lo = self.conn.lobject()
        lo.write(b"some data")
        lo.close()

        self.tmpdir = tempfile.mkdtemp()
        filename = os.path.join(self.tmpdir, "data.txt")
        lo.export(filename)
        self.assertTrue(os.path.exists(filename))
        f = open(filename, "rb")
        try:
            self.assertEqual(f.read(), b"some data")
        finally:
            f.close()

    def test_close_after_commit(self):
        lo = self.conn.lobject()
        self.lo_oid = lo.oid
        self.conn.commit()

        # Closing outside of the transaction is okay.
        lo.close()

    def test_write_after_commit(self):
        lo = self.conn.lobject()
        self.lo_oid = lo.oid
        self.conn.commit()

        self.assertRaises(psycopg2.ProgrammingError, lo.write, b"some data")

    def test_read_after_commit(self):
        lo = self.conn.lobject()
        self.lo_oid = lo.oid
        self.conn.commit()

        self.assertRaises(psycopg2.ProgrammingError, lo.read, 5)

    def test_seek_after_commit(self):
        lo = self.conn.lobject()
        self.lo_oid = lo.oid
        self.conn.commit()

        self.assertRaises(psycopg2.ProgrammingError, lo.seek, 0)

    def test_tell_after_commit(self):
        lo = self.conn.lobject()
        self.lo_oid = lo.oid
        self.conn.commit()

        self.assertRaises(psycopg2.ProgrammingError, lo.tell)

    def test_unlink_after_commit(self):
        lo = self.conn.lobject()
        self.lo_oid = lo.oid
        self.conn.commit()
Loading ...