Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / joblib   python

Repository URL to install this package:

Version: 0.14.1 

/ test / test_memory.py

"""
Test the memory module.
"""

# Author: Gael Varoquaux <gael dot varoquaux at normalesup dot org>
# Copyright (c) 2009 Gael Varoquaux
# License: BSD Style, 3 clauses.

import gc
import shutil
import os
import os.path
import pickle
import sys
import time
import datetime

import pytest

from joblib.memory import Memory
from joblib.memory import MemorizedFunc, NotMemorizedFunc
from joblib.memory import MemorizedResult, NotMemorizedResult
from joblib.memory import _FUNCTION_HASHES
from joblib.memory import register_store_backend, _STORE_BACKENDS
from joblib.memory import _build_func_identifier, _store_backend_factory
from joblib.memory import JobLibCollisionWarning
from joblib.parallel import Parallel, delayed
from joblib._store_backends import StoreBackendBase, FileSystemStoreBackend
from joblib.test.common import with_numpy, np
from joblib.test.common import with_multiprocessing
from joblib.testing import parametrize, raises, warns
from joblib._compat import PY3_OR_LATER
from joblib.hashing import hash

if sys.version_info[:2] >= (3, 4):
    import pathlib


###############################################################################
# Module-level variables for the tests
def f(x, y=1):
    """ A module-level function for testing purposes.
    """
    return x ** 2 + y


###############################################################################
# Helper function for the tests
def check_identity_lazy(func, accumulator, location):
    """ Given a function and an accumulator (a list that grows every
        time the function is called), check that the function can be
        decorated by memory to be a lazy identity.
    """
    # Call each function with several arguments, and check that it is
    # evaluated only once per argument.
    memory = Memory(location=location, verbose=0)
    func = memory.cache(func)
    for i in range(3):
        for _ in range(2):
            assert func(i) == i
            assert len(accumulator) == i + 1


def corrupt_single_cache_item(memory):
    single_cache_item, = memory.store_backend.get_items()
    output_filename = os.path.join(single_cache_item.path, 'output.pkl')
    with open(output_filename, 'w') as f:
        f.write('garbage')


def monkeypatch_cached_func_warn(func, monkeypatch_fixture):
    # Need monkeypatch because pytest does not
    # capture stdlib logging output (see
    # https://github.com/pytest-dev/pytest/issues/2079)

    recorded = []

    def append_to_record(item):
        recorded.append(item)
    monkeypatch_fixture.setattr(func, 'warn', append_to_record)
    return recorded


###############################################################################
# Tests
def test_memory_integration(tmpdir):
    """ Simple test of memory lazy evaluation.
    """
    accumulator = list()
    # Rmk: this function has the same name than a module-level function,
    # thus it serves as a test to see that both are identified
    # as different.

    def f(l):
        accumulator.append(1)
        return l

    check_identity_lazy(f, accumulator, tmpdir.strpath)

    # Now test clearing
    for compress in (False, True):
        for mmap_mode in ('r', None):
            memory = Memory(location=tmpdir.strpath, verbose=10,
                            mmap_mode=mmap_mode, compress=compress)
            # First clear the cache directory, to check that our code can
            # handle that
            # NOTE: this line would raise an exception, as the database file is
            # still open; we ignore the error since we want to test what
            # happens if the directory disappears
            shutil.rmtree(tmpdir.strpath, ignore_errors=True)
            g = memory.cache(f)
            g(1)
            g.clear(warn=False)
            current_accumulator = len(accumulator)
            out = g(1)

        assert len(accumulator) == current_accumulator + 1
        # Also, check that Memory.eval works similarly
        assert memory.eval(f, 1) == out
        assert len(accumulator) == current_accumulator + 1

    # Now do a smoke test with a function defined in __main__, as the name
    # mangling rules are more complex
    f.__module__ = '__main__'
    memory = Memory(location=tmpdir.strpath, verbose=0)
    memory.cache(f)(1)


def test_no_memory():
    """ Test memory with location=None: no memoize """
    accumulator = list()

    def ff(l):
        accumulator.append(1)
        return l

    memory = Memory(location=None, verbose=0)
    gg = memory.cache(ff)
    for _ in range(4):
        current_accumulator = len(accumulator)
        gg(1)
        assert len(accumulator) == current_accumulator + 1


def test_memory_kwarg(tmpdir):
    " Test memory with a function with keyword arguments."
    accumulator = list()

    def g(l=None, m=1):
        accumulator.append(1)
        return l

    check_identity_lazy(g, accumulator, tmpdir.strpath)

    memory = Memory(location=tmpdir.strpath, verbose=0)
    g = memory.cache(g)
    # Smoke test with an explicit keyword argument:
    assert g(l=30, m=2) == 30


def test_memory_lambda(tmpdir):
    " Test memory with a function with a lambda."
    accumulator = list()

    def helper(x):
        """ A helper function to define l as a lambda.
        """
        accumulator.append(1)
        return x

    l = lambda x: helper(x)

    check_identity_lazy(l, accumulator, tmpdir.strpath)


def test_memory_name_collision(tmpdir):
    " Check that name collisions with functions will raise warnings"
    memory = Memory(location=tmpdir.strpath, verbose=0)

    @memory.cache
    def name_collision(x):
        """ A first function called name_collision
        """
        return x

    a = name_collision

    @memory.cache
    def name_collision(x):
        """ A second function called name_collision
        """
        return x

    b = name_collision

    with warns(JobLibCollisionWarning) as warninfo:
        a(1)
        b(1)

    assert len(warninfo) == 1
    assert "collision" in str(warninfo[0].message)


def test_memory_warning_lambda_collisions(tmpdir):
    # Check that multiple use of lambda will raise collisions
    memory = Memory(location=tmpdir.strpath, verbose=0)
    a = lambda x: x
    a = memory.cache(a)
    b = lambda x: x + 1
    b = memory.cache(b)

    with warns(JobLibCollisionWarning) as warninfo:
        assert a(0) == 0
        assert b(1) == 2
        assert a(1) == 1

    # In recent Python versions, we can retrieve the code of lambdas,
    # thus nothing is raised
    assert len(warninfo) == 4


def test_memory_warning_collision_detection(tmpdir):
    # Check that collisions impossible to detect will raise appropriate
    # warnings.
    memory = Memory(location=tmpdir.strpath, verbose=0)
    a1 = eval('lambda x: x')
    a1 = memory.cache(a1)
    b1 = eval('lambda x: x+1')
    b1 = memory.cache(b1)

    with warns(JobLibCollisionWarning) as warninfo:
        a1(1)
        b1(1)
        a1(0)

    assert len(warninfo) == 2
    assert "cannot detect" in str(warninfo[0].message).lower()


def test_memory_partial(tmpdir):
    " Test memory with functools.partial."
    accumulator = list()

    def func(x, y):
        """ A helper function to define l as a lambda.
        """
        accumulator.append(1)
        return y

    import functools
    function = functools.partial(func, 1)

    check_identity_lazy(function, accumulator, tmpdir.strpath)


def test_memory_eval(tmpdir):
    " Smoke test memory with a function with a function defined in an eval."
    memory = Memory(location=tmpdir.strpath, verbose=0)

    m = eval('lambda x: x')
    mm = memory.cache(m)

    assert mm(1) == 1


def count_and_append(x=[]):
    """ A function with a side effect in its arguments.

        Return the lenght of its argument and append one element.
    """
    len_x = len(x)
    x.append(None)
    return len_x


def test_argument_change(tmpdir):
    """ Check that if a function has a side effect in its arguments, it
        should use the hash of changing arguments.
    """
    memory = Memory(location=tmpdir.strpath, verbose=0)
    func = memory.cache(count_and_append)
    # call the function for the first time, is should cache it with
    # argument x=[]
    assert func() == 0
    # the second time the argument is x=[None], which is not cached
    # yet, so the functions should be called a second time
    assert func() == 1


@with_numpy
@parametrize('mmap_mode', [None, 'r'])
def test_memory_numpy(tmpdir, mmap_mode):
    " Test memory with a function with numpy arrays."
    accumulator = list()

    def n(l=None):
        accumulator.append(1)
        return l

    memory = Memory(location=tmpdir.strpath, mmap_mode=mmap_mode,
                    verbose=0)
    cached_n = memory.cache(n)

    rnd = np.random.RandomState(0)
    for i in range(3):
        a = rnd.random_sample((10, 10))
        for _ in range(3):
            assert np.all(cached_n(a) == a)
            assert len(accumulator) == i + 1


@with_numpy
def test_memory_numpy_check_mmap_mode(tmpdir, monkeypatch):
    """Check that mmap_mode is respected even at the first call"""

    memory = Memory(location=tmpdir.strpath, mmap_mode='r', verbose=0)

    @memory.cache()
    def twice(a):
        return a * 2

    a = np.ones(3)

    b = twice(a)
    c = twice(a)

    assert isinstance(c, np.memmap)
    assert c.mode == 'r'

    assert isinstance(b, np.memmap)
    assert b.mode == 'r'

    # Corrupts the file,  Deleting b and c mmaps
    # is necessary to be able edit the file
    del b
    del c
    gc.collect()
    corrupt_single_cache_item(memory)

    # Make sure that corrupting the file causes recomputation and that
    # a warning is issued.
    recorded_warnings = monkeypatch_cached_func_warn(twice, monkeypatch)
    d = twice(a)
    assert len(recorded_warnings) == 1
    exception_msg = 'Exception while loading results'
Loading ...