Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Mako / test / test_cache.py
Size: Mime:
from mako.template import Template
from mako.lookup import TemplateLookup
from mako import lookup
import time
from test.util import result_lines
from test import TemplateTest, module_base
from test import eq_, SkipTest
from mako.compat import py27

from mako.ext import beaker_cache

if beaker_cache.has_beaker:
    import beaker

from mako.cache import register_plugin, CacheImpl


class SimpleBackend(object):
    def __init__(self):
        self.cache = {}

    def get(self, key, **kw):
        return self.cache[key]

    def invalidate(self, key, **kw):
        self.cache.pop(key, None)

    def put(self, key, value, **kw):
        self.cache[key] = value

    def get_or_create(self, key, creation_function, **kw):
        if key in self.cache:
            return self.cache[key]
        else:
            self.cache[key] = value = creation_function()
            return value


class MockCacheImpl(CacheImpl):
    realcacheimpl = None

    def __init__(self, cache):
        self.cache = cache

    def set_backend(self, cache, backend):
        if backend == 'simple':
            self.realcacheimpl = SimpleBackend()
        else:
            self.realcacheimpl = cache._load_impl(backend)

    def _setup_kwargs(self, kw):
        self.kwargs = kw.copy()
        self.kwargs.pop('regions', None)
        self.kwargs.pop('manager', None)
        if self.kwargs.get('region') != 'myregion':
            self.kwargs.pop('region', None)

    def get_or_create(self, key, creation_function, **kw):
        self.key = key
        self._setup_kwargs(kw)
        return self.realcacheimpl.\
            get_or_create(key, creation_function, **kw)

    def put(self, key, value, **kw):
        self.key = key
        self._setup_kwargs(kw)
        self.realcacheimpl.put(key, value, **kw)

    def get(self, key, **kw):
        self.key = key
        self._setup_kwargs(kw)
        return self.realcacheimpl.get(key, **kw)

    def invalidate(self, key, **kw):
        self.key = key
        self._setup_kwargs(kw)
        self.realcacheimpl.invalidate(key, **kw)


register_plugin("mock", __name__, "MockCacheImpl")


class CacheTest(TemplateTest):

    real_backend = 'simple'

    def _install_mock_cache(self, template, implname=None):
        template.cache_impl = 'mock'
        impl = template.cache.impl
        impl.set_backend(template.cache, implname or self.real_backend)
        return impl

    def test_def(self):
        t = Template("""
        <%!
            callcount = [0]
        %>
        <%def name="foo()" cached="True">
            this is foo
            <%
            callcount[0] += 1
            %>
        </%def>

        ${foo()}
        ${foo()}
        ${foo()}
        callcount: ${callcount}
""")
        m = self._install_mock_cache(t)
        assert result_lines(t.render()) == [
            'this is foo',
            'this is foo',
            'this is foo',
            'callcount: [1]',
        ]
        assert m.kwargs == {}

    def test_cache_enable(self):
        t = Template("""
            <%!
                callcount = [0]
            %>
            <%def name="foo()" cached="True">
                <% callcount[0] += 1 %>
            </%def>
            ${foo()}
            ${foo()}
            callcount: ${callcount}
        """, cache_enabled=False)
        self._install_mock_cache(t)

        eq_(t.render().strip(), "callcount: [2]")

    def test_nested_def(self):
        t = Template("""
        <%!
            callcount = [0]
        %>
        <%def name="foo()">
            <%def name="bar()" cached="True">
                this is foo
                <%
                callcount[0] += 1
                %>
            </%def>
            ${bar()}
        </%def>

        ${foo()}
        ${foo()}
        ${foo()}
        callcount: ${callcount}
""")
        m = self._install_mock_cache(t)
        assert result_lines(t.render()) == [
            'this is foo',
            'this is foo',
            'this is foo',
            'callcount: [1]',
        ]
        assert m.kwargs == {}

    def test_page(self):
        t = Template("""
        <%!
            callcount = [0]
        %>
        <%page cached="True"/>
        this is foo
        <%
        callcount[0] += 1
        %>
        callcount: ${callcount}
""")
        m = self._install_mock_cache(t)
        t.render()
        t.render()
        assert result_lines(t.render()) == [
            "this is foo",
            "callcount: [1]"
        ]
        assert m.kwargs == {}

    def test_dynamic_key_with_context(self):
        t = Template("""
            <%block name="foo" cached="True" cache_key="${mykey}">
                some block
            </%block>
        """)
        m = self._install_mock_cache(t)
        t.render(mykey="thekey")
        t.render(mykey="thekey")
        eq_(
            result_lines(t.render(mykey="thekey")),
            ["some block"]
        )
        eq_(m.key, "thekey")

        t = Template("""
            <%def name="foo()" cached="True" cache_key="${mykey}">
                some def
            </%def>
            ${foo()}
        """)
        m = self._install_mock_cache(t)
        t.render(mykey="thekey")
        t.render(mykey="thekey")
        eq_(
            result_lines(t.render(mykey="thekey")),
            ["some def"]
        )
        eq_(m.key, "thekey")

    def test_dynamic_key_with_funcargs(self):
        t = Template("""
            <%def name="foo(num=5)" cached="True" cache_key="foo_${str(num)}">
             hi
            </%def>

            ${foo()}
        """)
        m = self._install_mock_cache(t)
        t.render()
        t.render()
        assert result_lines(t.render()) == ['hi']
        assert m.key == "foo_5"

        t = Template("""
            <%def name="foo(*args, **kwargs)" cached="True"
             cache_key="foo_${kwargs['bar']}">
             hi
            </%def>

            ${foo(1, 2, bar='lala')}
        """)
        m = self._install_mock_cache(t)
        t.render()
        assert result_lines(t.render()) == ['hi']
        assert m.key == "foo_lala"

        t = Template('''
        <%page args="bar='hi'" cache_key="foo_${bar}" cached="True"/>
         hi
        ''')
        m = self._install_mock_cache(t)
        t.render()
        assert result_lines(t.render()) == ['hi']
        assert m.key == "foo_hi"

    def test_dynamic_key_with_imports(self):
        lookup = TemplateLookup()
        lookup.put_string("foo.html", """
        <%!
            callcount = [0]
        %>
        <%namespace file="ns.html" import="*"/>
        <%page cached="True" cache_key="${foo}"/>
        this is foo
        <%
        callcount[0] += 1
        %>
        callcount: ${callcount}
""")
        lookup.put_string("ns.html", """""")
        t = lookup.get_template("foo.html")
        m = self._install_mock_cache(t)
        t.render(foo='somekey')
        t.render(foo='somekey')
        assert result_lines(t.render(foo='somekey')) == [
            "this is foo",
            "callcount: [1]"
        ]
        assert m.kwargs == {}

    def test_fileargs_implicit(self):
        l = lookup.TemplateLookup(module_directory=module_base)
        l.put_string("test", """
                <%!
                    callcount = [0]
                %>
                <%def name="foo()" cached="True" cache_type='dbm'>
                    this is foo
                    <%
                    callcount[0] += 1
                    %>
                </%def>

                ${foo()}
                ${foo()}
                ${foo()}
                callcount: ${callcount}
        """)

        m = self._install_mock_cache(l.get_template('test'))
        assert result_lines(l.get_template('test').render()) == [
            'this is foo',
            'this is foo',
            'this is foo',
            'callcount: [1]',
        ]
        eq_(m.kwargs, {'type': 'dbm'})

    def test_fileargs_deftag(self):
        t = Template("""
        <%%!
            callcount = [0]
        %%>
        <%%def name="foo()" cached="True" cache_type='file' cache_dir='%s'>
            this is foo
            <%%
            callcount[0] += 1
            %%>
        </%%def>

        ${foo()}
        ${foo()}
        ${foo()}
        callcount: ${callcount}
""" % module_base)
        m = self._install_mock_cache(t)
        assert result_lines(t.render()) == [
            'this is foo',
            'this is foo',
            'this is foo',
            'callcount: [1]',
        ]
        assert m.kwargs == {'type': 'file', 'dir': module_base}

    def test_fileargs_pagetag(self):
        t = Template("""
        <%%page cache_dir='%s' cache_type='dbm'/>
        <%%!
            callcount = [0]
        %%>
        <%%def name="foo()" cached="True">
            this is foo
            <%%
            callcount[0] += 1
            %%>
        </%%def>

        ${foo()}
        ${foo()}
        ${foo()}
        callcount: ${callcount}
""" % module_base)
        m = self._install_mock_cache(t)
        assert result_lines(t.render()) == [
            'this is foo',
            'this is foo',
            'this is foo',
            'callcount: [1]',
        ]
        eq_(m.kwargs, {'dir': module_base, 'type': 'dbm'})

    def test_args_complete(self):
        t = Template("""
        <%%def name="foo()" cached="True" cache_timeout="30" cache_dir="%s"
         cache_type="file" cache_key='somekey'>
            this is foo
        </%%def>

        ${foo()}
""" % module_base)
        m = self._install_mock_cache(t)
        t.render()
        eq_(m.kwargs, {'dir': module_base, 'type': 'file', 'timeout': 30})

        t2 = Template("""
        <%%page cached="True" cache_timeout="30" cache_dir="%s"
         cache_type="file" cache_key='somekey'/>
        hi
        """ % module_base)
        m = self._install_mock_cache(t2)
        t2.render()
        eq_(m.kwargs, {'dir': module_base, 'type': 'file', 'timeout': 30})

    def test_fileargs_lookup(self):
        l = lookup.TemplateLookup(cache_dir=module_base, cache_type='file')
        l.put_string("test", """
                <%!
                    callcount = [0]
                %>
                <%def name="foo()" cached="True">
                    this is foo
                    <%
                    callcount[0] += 1
                    %>
                </%def>

                ${foo()}
                ${foo()}
                ${foo()}
                callcount: ${callcount}
        """)

        t = l.get_template('test')
        m = self._install_mock_cache(t)
        assert result_lines(l.get_template('test').render()) == [
            'this is foo',
            'this is foo',
            'this is foo',
            'callcount: [1]',
        ]
        eq_(m.kwargs, {'dir': module_base, 'type': 'file'})

    def test_buffered(self):
        t = Template("""
        <%!
            def a(text):
                return "this is a " + text.strip()
        %>
        ${foo()}
        ${foo()}
        <%def name="foo()" cached="True" buffered="True">
            this is a test
        </%def>
        """, buffer_filters=["a"])
        self._install_mock_cache(t)
        eq_(
            result_lines(t.render()),
            ["this is a this is a test", "this is a this is a test"]
        )

    def test_load_from_expired(self):
        """test that the cache callable can be called safely after the
        originating template has completed rendering.

        """
        t = Template("""
        ${foo()}
        <%def name="foo()" cached="True" cache_timeout="1">
            foo
        </%def>
        """)
        self._install_mock_cache(t)

        x1 = t.render()
        time.sleep(1.2)
        x2 = t.render()
        assert x1.strip() == x2.strip() == "foo"

    def test_namespace_access(self):
        t = Template("""
            <%def name="foo(x)" cached="True">
                foo: ${x}
            </%def>

            <%
                foo(1)
                foo(2)
                local.cache.invalidate_def('foo')
                foo(3)
                foo(4)
            %>
        """)
        self._install_mock_cache(t)
        eq_(
            result_lines(t.render()),
            ['foo: 1', 'foo: 1', 'foo: 3', 'foo: 3']
        )

    def test_lookup(self):
        l = TemplateLookup(cache_impl='mock')
        l.put_string("x", """
            <%page cached="True" />
            ${y}
        """)
        t = l.get_template("x")
        self._install_mock_cache(t)
        assert result_lines(t.render(y=5)) == ["5"]
        assert result_lines(t.render(y=7)) == ["5"]
        assert isinstance(t.cache.impl, MockCacheImpl)

    def test_invalidate(self):
        t = Template("""
            <%%def name="foo()" cached="True">
                foo: ${x}
            </%%def>

            <%%def name="bar()" cached="True" cache_type='dbm' cache_dir='%s'>
                bar: ${x}
            </%%def>
            ${foo()} ${bar()}
        """ % module_base)
        self._install_mock_cache(t)
        assert result_lines(t.render(x=1)) == ["foo: 1", "bar: 1"]
        assert result_lines(t.render(x=2)) == ["foo: 1", "bar: 1"]
        t.cache.invalidate_def('foo')
        assert result_lines(t.render(x=3)) == ["foo: 3", "bar: 1"]
        t.cache.invalidate_def('bar')
        assert result_lines(t.render(x=4)) == ["foo: 3", "bar: 4"]

        t = Template("""
            <%%page cached="True" cache_type="dbm" cache_dir="%s"/>

            page: ${x}
        """ % module_base)
        self._install_mock_cache(t)
        assert result_lines(t.render(x=1)) == ["page: 1"]
        assert result_lines(t.render(x=2)) == ["page: 1"]
        t.cache.invalidate_body()
        assert result_lines(t.render(x=3)) == ["page: 3"]
        assert result_lines(t.render(x=4)) == ["page: 3"]

    def test_custom_args_def(self):
        t = Template("""
            <%def name="foo()" cached="True" cache_region="myregion"
                    cache_timeout="50" cache_foo="foob">
            </%def>
            ${foo()}
        """)
        m = self._install_mock_cache(t, 'simple')
        t.render()
        eq_(
            m.kwargs,
            {'region': 'myregion',
             'timeout': 50, 'foo': 'foob'})

    def test_custom_args_block(self):
        t = Template("""
            <%block name="foo" cached="True" cache_region="myregion"
                    cache_timeout="50" cache_foo="foob">
            </%block>
        """)
        m = self._install_mock_cache(t, "simple")
        t.render()
        eq_(
            m.kwargs,
            {'region': 'myregion',
             'timeout': 50, 'foo': 'foob'})

    def test_custom_args_page(self):
        t = Template("""
            <%page cached="True" cache_region="myregion"
                    cache_timeout="50" cache_foo="foob"/>
        """)
        m = self._install_mock_cache(t, "simple")
        t.render()
        eq_(
            m.kwargs,
            {'region': 'myregion',
             'timeout': 50, 'foo': 'foob'})

    def test_pass_context(self):
        t = Template("""
            <%page cached="True"/>
        """)
        m = self._install_mock_cache(t)
        t.render()
        assert 'context' not in m.kwargs

        m.pass_context = True
        t.render(x="bar")
        assert 'context' in m.kwargs
        assert m.kwargs['context'].get('x') == 'bar'


class RealBackendTest(object):
    def test_cache_uses_current_context(self):
        t = Template("""
        ${foo()}
        <%def name="foo()" cached="True" cache_timeout="1">
            foo: ${x}
        </%def>
        """)
        self._install_mock_cache(t)

        x1 = t.render(x=1)
        time.sleep(1.2)
        x2 = t.render(x=2)
        eq_(x1.strip(), "foo: 1")
        eq_(x2.strip(), "foo: 2")

    def test_region(self):
        t = Template(
            """
            <%block name="foo" cached="True" cache_region="short">
                short term ${x}
            </%block>
            <%block name="bar" cached="True" cache_region="long">
                long term ${x}
            </%block>
            <%block name="lala">
                none ${x}
            </%block>
        """)

        self._install_mock_cache(t)
        r1 = result_lines(t.render(x=5))
        time.sleep(1.2)
        r2 = result_lines(t.render(x=6))
        r3 = result_lines(t.render(x=7))
        eq_(r1, ["short term 5", "long term 5", "none 5"])
        eq_(r2, ["short term 6", "long term 5", "none 6"])
        eq_(r3, ["short term 6", "long term 5", "none 7"])


class BeakerCacheTest(RealBackendTest, CacheTest):
    real_backend = 'beaker'

    def setUp(self):
        if not beaker_cache.has_beaker:
            raise SkipTest("Beaker is required for these tests.")
        if not py27:
            raise SkipTest("newer beakers not working w/ py26")

    def _install_mock_cache(self, template, implname=None):
        template.cache_args['manager'] = self._regions()
        impl = super(BeakerCacheTest, self)._install_mock_cache(
            template, implname)
        return impl

    def _regions(self):
        return beaker.cache.CacheManager(
            cache_regions={
                'short': {
                    'expire': 1,
                    'type': 'memory'
                },
                'long': {
                    'expire': 60,
                    'type': 'memory'
                }
            }
        )


class DogpileCacheTest(RealBackendTest, CacheTest):
    real_backend = 'dogpile.cache'

    def setUp(self):
        try:
            import dogpile.cache  # noqa
        except ImportError:
            raise SkipTest("dogpile.cache is required to run these tests")

    def _install_mock_cache(self, template, implname=None):
        template.cache_args['regions'] = self._regions()
        template.cache_args.setdefault('region', 'short')
        impl = super(DogpileCacheTest, self)._install_mock_cache(
            template, implname)
        return impl

    def _regions(self):
        from dogpile.cache import make_region

        my_regions = {
            "short": make_region().configure(
                "dogpile.cache.memory",
                expiration_time=1
            ),
            "long": make_region().configure(
                "dogpile.cache.memory",
                expiration_time=60
            ),
            "myregion": make_region().configure(
                "dogpile.cache.memory",
                expiration_time=60
                )
        }

        return my_regions