##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import gc
import unittest
from persistent.interfaces import UPTODATE
from persistent._compat import PYPY
from persistent.tests.utils import skipIfNoCExtension
# pylint:disable=protected-access,too-many-lines,too-many-public-methods
# pylint:disable=attribute-defined-outside-init,redefined-outer-name
_marker = object()
class DummyPersistent(object):
_Persistent__ring = None
def _p_invalidate(self):
from persistent.interfaces import GHOST
self._p_state = GHOST
_p_deactivate = _p_invalidate
def _p_invalidate_deactivate_helper(self, clear=True):
self._p_invalidate()
def _p_activate(self):
self._p_state = UPTODATE
class DummyConnection(object):
pass
def _len(seq):
return len(list(seq))
class PickleCacheTestMixin(object):
# py2/3 compat
assertRaisesRegex = getattr(unittest.TestCase,
'assertRaisesRegex',
unittest.TestCase.assertRaisesRegexp)
def _getTargetClass(self):
from persistent.picklecache import PickleCachePy as BasePickleCache
class PickleCache(BasePickleCache):
_CACHEABLE_TYPES = BasePickleCache._CACHEABLE_TYPES + (DummyPersistent,)
return PickleCache
def _getTargetInterface(self):
from persistent.interfaces import IPickleCache
return IPickleCache
def _makeOne(self, jar=None, target_size=10):
if jar is None:
jar = DummyConnection()
return self._getTargetClass()(jar, target_size)
def _getDummyPersistentClass(self):
return DummyPersistent
def _getRealPersistentClass(self):
from persistent.persistence import PersistentPy
return PersistentPy
def _makePersist(self, state=None, oid=b'foo', jar=_marker, kind=_marker):
from persistent.interfaces import GHOST
if state is None:
state = GHOST
if jar is _marker:
jar = DummyConnection()
kind = self._getDummyPersistentClass() if kind is _marker else kind
persist = kind()
try:
persist._p_state = state
except AttributeError:
pass
persist._p_oid = oid
persist._p_jar = jar
return persist
def test_class_conforms_to_IPickleCache(self):
from zope.interface.verify import verifyClass
verifyClass(self._getTargetInterface(), self._getTargetClass())
def test_instance_conforms_to_IPickleCache(self):
from zope.interface.verify import verifyObject
verifyObject(self._getTargetInterface(), self._makeOne())
def test_empty(self):
cache = self._makeOne()
self.assertEqual(len(cache), 0)
self.assertEqual(_len(cache.items()), 0)
self.assertEqual(_len(cache.klass_items()), 0)
self.assertEqual(cache.ringlen(), 0)
self.assertEqual(len(cache.lru_items()), 0)
self.assertEqual(cache.cache_size, 10)
self.assertEqual(cache.cache_drain_resistance, 0)
self.assertEqual(cache.cache_non_ghost_count, 0)
self.assertEqual(dict(cache.cache_data), {})
self.assertEqual(cache.cache_klass_count, 0)
def test___getitem___nonesuch_raises_KeyError(self):
cache = self._makeOne()
self.assertRaises(KeyError, lambda: cache['nonesuch'])
def test_get_nonesuch_no_default(self):
cache = self._makeOne()
self.assertEqual(cache.get('nonesuch'), None)
def test_get_nonesuch_w_default(self):
cache = self._makeOne()
default = object
self.assertIs(cache.get('nonesuch', default), default)
def test___setitem___non_string_oid_raises_TypeError(self):
cache = self._makeOne()
with self.assertRaises(TypeError):
cache[object()] = self._makePersist()
def test___setitem___duplicate_oid_same_obj(self):
KEY = b'original'
cache = self._makeOne()
original = self._makePersist(oid=KEY)
cache[KEY] = original
cache[KEY] = original
def test___setitem___duplicate_oid_raises_ValueError(self):
KEY = b'original'
cache = self._makeOne()
original = self._makePersist(oid=KEY)
cache[KEY] = original
duplicate = self._makePersist(oid=KEY)
with self.assertRaises(ValueError):
cache[KEY] = duplicate
def test___setitem___ghost(self):
from persistent.interfaces import GHOST
KEY = b'ghost'
cache = self._makeOne()
ghost = self._makePersist(state=GHOST, oid=KEY)
cache[KEY] = ghost
self.assertEqual(len(cache), 1)
items = list(cache.items())
self.assertEqual(len(items), 1)
self.assertEqual(_len(cache.klass_items()), 0)
self.assertEqual(items[0][0], KEY)
self.assertIs(items[0][1], ghost)
self.assertIs(cache[KEY], ghost)
return cache
def test___setitem___mismatch_key_oid(self):
KEY = b'uptodate'
cache = self._makeOne()
uptodate = self._makePersist(state=UPTODATE)
with self.assertRaises(ValueError):
cache[KEY] = uptodate
def test___setitem___non_ghost(self):
KEY = b'uptodate'
cache = self._makeOne()
uptodate = self._makePersist(state=UPTODATE, oid=KEY)
cache[KEY] = uptodate
self.assertEqual(len(cache), 1)
items = list(cache.items())
self.assertEqual(len(items), 1)
self.assertEqual(_len(cache.klass_items()), 0)
self.assertEqual(items[0][0], KEY)
self.assertEqual(cache.ringlen(), 1)
self.assertTrue(items[0][1] is uptodate)
self.assertTrue(cache[KEY] is uptodate)
self.assertTrue(cache.get(KEY) is uptodate)
def test___setitem___persistent_class(self):
KEY = b'pclass'
class pclass(object):
_p_oid = KEY
_p_jar = DummyConnection()
cache = self._makeOne(pclass._p_jar)
cache[KEY] = pclass
kitems = list(cache.klass_items())
self.assertEqual(len(cache), 1)
self.assertEqual(len(kitems), 1)
self.assertEqual(kitems[0][0], KEY)
self.assertIs(kitems[0][1], pclass)
self.assertIs(cache[KEY], pclass)
self.assertIs(cache.get(KEY), pclass)
return cache
def test___delitem___non_string_oid_raises_TypeError(self):
cache = self._makeOne()
with self.assertRaises(TypeError):
del cache[object()]
def test___delitem___nonesuch_raises_KeyError(self):
cache = self._makeOne()
with self.assertRaises(KeyError):
del cache[b'nonesuch']
def test___delitem___w_persistent_class(self):
KEY = b'pclass'
cache = self._makeOne()
class pclass(object):
_p_oid = KEY
_p_jar = DummyConnection()
cache = self._makeOne()
cache[KEY] = pclass
del cache[KEY]
self.assertIs(cache.get(KEY, self), self)
self.assertEqual(cache.ringlen(), 0)
return cache, KEY
def test___delitem___w_normal_object(self):
KEY = b'uptodate'
cache = self._makeOne()
uptodate = self._makePersist(state=UPTODATE, oid=KEY)
cache[KEY] = uptodate
del cache[KEY]
self.assertTrue(cache.get(KEY, self) is self)
def test___delitem___w_ghost(self):
from persistent.interfaces import GHOST
cache = self._makeOne()
KEY = b'ghost'
ghost = self._makePersist(state=GHOST, oid=KEY)
cache[KEY] = ghost
del cache[KEY]
self.assertTrue(cache.get(KEY, self) is self)
def test___delitem___w_remaining_object(self):
cache = self._makeOne()
REMAINS = b'remains'
UPTODATE = b'uptodate'
remains = self._makePersist(state=UPTODATE, oid=REMAINS)
uptodate = self._makePersist(state=UPTODATE, oid=UPTODATE)
cache[REMAINS] = remains
cache[UPTODATE] = uptodate
del cache[UPTODATE]
self.assertTrue(cache.get(UPTODATE, self) is self)
self.assertTrue(cache.get(REMAINS, self) is remains)
def test_lruitems(self):
cache = self._makeOne()
ONE = b'one'
TWO = b'two'
THREE = b'three'
cache[ONE] = self._makePersist(oid=b'one', state=UPTODATE)
cache[TWO] = self._makePersist(oid=b'two', state=UPTODATE)
cache[THREE] = self._makePersist(oid=b'three', state=UPTODATE)
items = cache.lru_items()
self.assertEqual(_len(items), 3)
self.assertEqual(items[0][0], ONE)
self.assertEqual(items[1][0], TWO)
self.assertEqual(items[2][0], THREE)
def _numbered_oid(self, i):
# Python 3.4 doesn't support % on bytes,
# so we go the long way
oid_s = 'oid_%04d' % i
return oid_s.encode('ascii')
def _populate_cache(self, cache, count=100,
state_0=UPTODATE,
state_rest=UPTODATE):
oids = []
for i in range(100):
oid = self._numbered_oid(i)
oids.append(oid)
state = state_0 if i == 0 else state_rest
cache[oid] = self._makePersist(oid=oid, state=state)
return oids
def test_incrgc_simple(self):
cache = self._makeOne()
oids = self._populate_cache(cache)
self.assertEqual(cache.cache_non_ghost_count, 100)
cache.incrgc()
gc.collect() # banish the ghosts who are no longer in the ring
self.assertEqual(cache.cache_non_ghost_count, 10)
items = cache.lru_items()
self.assertEqual(_len(items), 10)
self.assertEqual(items[0][0], b'oid_0090')
self.assertEqual(items[1][0], b'oid_0091')
self.assertEqual(items[2][0], b'oid_0092')
self.assertEqual(items[3][0], b'oid_0093')
self.assertEqual(items[4][0], b'oid_0094')
self.assertEqual(items[5][0], b'oid_0095')
self.assertEqual(items[6][0], b'oid_0096')
self.assertEqual(items[7][0], b'oid_0097')
self.assertEqual(items[8][0], b'oid_0098')
self.assertEqual(items[9][0], b'oid_0099')
for oid in oids[:90]:
self.assertIsNone(cache.get(oid))
Loading ...