Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / persistent   python

Repository URL to install this package:

Version: 4.6.4 

/ tests / test_picklecache.py

##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import gc
import unittest

from persistent.interfaces import UPTODATE
from persistent._compat import PYPY
from persistent.tests.utils import skipIfNoCExtension


# pylint:disable=protected-access,too-many-lines,too-many-public-methods
# pylint:disable=attribute-defined-outside-init,redefined-outer-name

_marker = object()

class DummyPersistent(object):
    _Persistent__ring = None

    def _p_invalidate(self):
        from persistent.interfaces import GHOST
        self._p_state = GHOST

    _p_deactivate = _p_invalidate

    def _p_invalidate_deactivate_helper(self, clear=True):
        self._p_invalidate()

    def _p_activate(self):
        self._p_state = UPTODATE


class DummyConnection(object):
    pass


def _len(seq):
    return len(list(seq))


class PickleCacheTestMixin(object):

    # py2/3 compat
    assertRaisesRegex = getattr(unittest.TestCase,
                                'assertRaisesRegex',
                                unittest.TestCase.assertRaisesRegexp)

    def _getTargetClass(self):
        from persistent.picklecache import PickleCachePy as BasePickleCache
        class PickleCache(BasePickleCache):
            _CACHEABLE_TYPES = BasePickleCache._CACHEABLE_TYPES + (DummyPersistent,)
        return PickleCache

    def _getTargetInterface(self):
        from persistent.interfaces import IPickleCache
        return IPickleCache

    def _makeOne(self, jar=None, target_size=10):
        if jar is None:
            jar = DummyConnection()
        return self._getTargetClass()(jar, target_size)

    def _getDummyPersistentClass(self):
        return DummyPersistent

    def _getRealPersistentClass(self):
        from persistent.persistence import PersistentPy
        return PersistentPy

    def _makePersist(self, state=None, oid=b'foo', jar=_marker, kind=_marker):
        from persistent.interfaces import GHOST

        if state is None:
            state = GHOST
        if jar is _marker:
            jar = DummyConnection()
        kind = self._getDummyPersistentClass() if kind is _marker else kind
        persist = kind()
        try:
            persist._p_state = state
        except AttributeError:
            pass
        persist._p_oid = oid
        persist._p_jar = jar
        return persist

    def test_class_conforms_to_IPickleCache(self):
        from zope.interface.verify import verifyClass
        verifyClass(self._getTargetInterface(), self._getTargetClass())

    def test_instance_conforms_to_IPickleCache(self):
        from zope.interface.verify import verifyObject
        verifyObject(self._getTargetInterface(), self._makeOne())

    def test_empty(self):
        cache = self._makeOne()

        self.assertEqual(len(cache), 0)
        self.assertEqual(_len(cache.items()), 0)
        self.assertEqual(_len(cache.klass_items()), 0)
        self.assertEqual(cache.ringlen(), 0)
        self.assertEqual(len(cache.lru_items()), 0)
        self.assertEqual(cache.cache_size, 10)
        self.assertEqual(cache.cache_drain_resistance, 0)
        self.assertEqual(cache.cache_non_ghost_count, 0)
        self.assertEqual(dict(cache.cache_data), {})
        self.assertEqual(cache.cache_klass_count, 0)

    def test___getitem___nonesuch_raises_KeyError(self):
        cache = self._makeOne()

        self.assertRaises(KeyError, lambda: cache['nonesuch'])

    def test_get_nonesuch_no_default(self):
        cache = self._makeOne()

        self.assertEqual(cache.get('nonesuch'), None)

    def test_get_nonesuch_w_default(self):
        cache = self._makeOne()
        default = object

        self.assertIs(cache.get('nonesuch', default), default)

    def test___setitem___non_string_oid_raises_TypeError(self):
        cache = self._makeOne()

        with self.assertRaises(TypeError):
            cache[object()] = self._makePersist()

    def test___setitem___duplicate_oid_same_obj(self):

        KEY = b'original'
        cache = self._makeOne()
        original = self._makePersist(oid=KEY)
        cache[KEY] = original
        cache[KEY] = original

    def test___setitem___duplicate_oid_raises_ValueError(self):

        KEY = b'original'
        cache = self._makeOne()
        original = self._makePersist(oid=KEY)
        cache[KEY] = original
        duplicate = self._makePersist(oid=KEY)

        with self.assertRaises(ValueError):
            cache[KEY] = duplicate

    def test___setitem___ghost(self):
        from persistent.interfaces import GHOST

        KEY = b'ghost'
        cache = self._makeOne()
        ghost = self._makePersist(state=GHOST, oid=KEY)

        cache[KEY] = ghost

        self.assertEqual(len(cache), 1)
        items = list(cache.items())
        self.assertEqual(len(items), 1)
        self.assertEqual(_len(cache.klass_items()), 0)
        self.assertEqual(items[0][0], KEY)
        self.assertIs(items[0][1], ghost)
        self.assertIs(cache[KEY], ghost)
        return cache

    def test___setitem___mismatch_key_oid(self):
        KEY = b'uptodate'
        cache = self._makeOne()
        uptodate = self._makePersist(state=UPTODATE)

        with self.assertRaises(ValueError):
            cache[KEY] = uptodate


    def test___setitem___non_ghost(self):
        KEY = b'uptodate'
        cache = self._makeOne()
        uptodate = self._makePersist(state=UPTODATE, oid=KEY)

        cache[KEY] = uptodate

        self.assertEqual(len(cache), 1)
        items = list(cache.items())
        self.assertEqual(len(items), 1)
        self.assertEqual(_len(cache.klass_items()), 0)
        self.assertEqual(items[0][0], KEY)
        self.assertEqual(cache.ringlen(), 1)
        self.assertTrue(items[0][1] is uptodate)
        self.assertTrue(cache[KEY] is uptodate)
        self.assertTrue(cache.get(KEY) is uptodate)

    def test___setitem___persistent_class(self):

        KEY = b'pclass'
        class pclass(object):
            _p_oid = KEY
            _p_jar = DummyConnection()
        cache = self._makeOne(pclass._p_jar)

        cache[KEY] = pclass

        kitems = list(cache.klass_items())
        self.assertEqual(len(cache), 1)
        self.assertEqual(len(kitems), 1)
        self.assertEqual(kitems[0][0], KEY)
        self.assertIs(kitems[0][1], pclass)
        self.assertIs(cache[KEY], pclass)
        self.assertIs(cache.get(KEY), pclass)
        return cache

    def test___delitem___non_string_oid_raises_TypeError(self):
        cache = self._makeOne()

        with self.assertRaises(TypeError):
            del cache[object()]

    def test___delitem___nonesuch_raises_KeyError(self):

        cache = self._makeOne()

        with self.assertRaises(KeyError):
            del cache[b'nonesuch']

    def test___delitem___w_persistent_class(self):

        KEY = b'pclass'
        cache = self._makeOne()
        class pclass(object):
            _p_oid = KEY
            _p_jar = DummyConnection()
        cache = self._makeOne()

        cache[KEY] = pclass
        del cache[KEY]
        self.assertIs(cache.get(KEY, self), self)
        self.assertEqual(cache.ringlen(), 0)
        return cache, KEY

    def test___delitem___w_normal_object(self):
        KEY = b'uptodate'
        cache = self._makeOne()
        uptodate = self._makePersist(state=UPTODATE, oid=KEY)

        cache[KEY] = uptodate

        del cache[KEY]
        self.assertTrue(cache.get(KEY, self) is self)

    def test___delitem___w_ghost(self):
        from persistent.interfaces import GHOST

        cache = self._makeOne()
        KEY = b'ghost'
        ghost = self._makePersist(state=GHOST, oid=KEY)

        cache[KEY] = ghost

        del cache[KEY]
        self.assertTrue(cache.get(KEY, self) is self)

    def test___delitem___w_remaining_object(self):
        cache = self._makeOne()
        REMAINS = b'remains'
        UPTODATE = b'uptodate'
        remains = self._makePersist(state=UPTODATE, oid=REMAINS)
        uptodate = self._makePersist(state=UPTODATE, oid=UPTODATE)

        cache[REMAINS] = remains
        cache[UPTODATE] = uptodate

        del cache[UPTODATE]
        self.assertTrue(cache.get(UPTODATE, self) is self)
        self.assertTrue(cache.get(REMAINS, self) is remains)

    def test_lruitems(self):
        cache = self._makeOne()
        ONE = b'one'
        TWO = b'two'
        THREE = b'three'
        cache[ONE] = self._makePersist(oid=b'one', state=UPTODATE)
        cache[TWO] = self._makePersist(oid=b'two', state=UPTODATE)
        cache[THREE] = self._makePersist(oid=b'three', state=UPTODATE)

        items = cache.lru_items()
        self.assertEqual(_len(items), 3)
        self.assertEqual(items[0][0], ONE)
        self.assertEqual(items[1][0], TWO)
        self.assertEqual(items[2][0], THREE)


    def _numbered_oid(self, i):
        # Python 3.4 doesn't support % on bytes,
        # so we go the long way
        oid_s = 'oid_%04d' % i
        return oid_s.encode('ascii')

    def _populate_cache(self, cache, count=100,
                        state_0=UPTODATE,
                        state_rest=UPTODATE):

        oids = []
        for i in range(100):
            oid = self._numbered_oid(i)
            oids.append(oid)
            state = state_0 if i == 0 else state_rest
            cache[oid] = self._makePersist(oid=oid, state=state)
        return oids

    def test_incrgc_simple(self):
        cache = self._makeOne()
        oids = self._populate_cache(cache)
        self.assertEqual(cache.cache_non_ghost_count, 100)

        cache.incrgc()
        gc.collect() # banish the ghosts who are no longer in the ring

        self.assertEqual(cache.cache_non_ghost_count, 10)
        items = cache.lru_items()
        self.assertEqual(_len(items), 10)
        self.assertEqual(items[0][0], b'oid_0090')
        self.assertEqual(items[1][0], b'oid_0091')
        self.assertEqual(items[2][0], b'oid_0092')
        self.assertEqual(items[3][0], b'oid_0093')
        self.assertEqual(items[4][0], b'oid_0094')
        self.assertEqual(items[5][0], b'oid_0095')
        self.assertEqual(items[6][0], b'oid_0096')
        self.assertEqual(items[7][0], b'oid_0097')
        self.assertEqual(items[8][0], b'oid_0098')
        self.assertEqual(items[9][0], b'oid_0099')

        for oid in oids[:90]:
            self.assertIsNone(cache.get(oid))
Loading ...