Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / joblib   python

Repository URL to install this package:

/ memory.py

"""
A context object for caching a function's return value each time it
is called with the same input arguments.

"""

# Author: Gael Varoquaux <gael dot varoquaux at normalesup dot org>
# Copyright (c) 2009 Gael Varoquaux
# License: BSD Style, 3 clauses.


from __future__ import with_statement
import os
import time
import pydoc
import re
import functools
import traceback
import warnings
import inspect
import sys
import weakref

# Local imports
from . import hashing
from .func_inspect import get_func_code, get_func_name, filter_args
from .func_inspect import format_call
from .func_inspect import format_signature
from ._memory_helpers import open_py_source
from .logger import Logger, format_time, pformat
from ._compat import _basestring, PY3_OR_LATER
from ._store_backends import StoreBackendBase, FileSystemStoreBackend

if sys.version_info[:2] >= (3, 4):
    import pathlib


FIRST_LINE_TEXT = "# first line:"

# TODO: The following object should have a data store object as a sub
# object, and the interface to persist and query should be separated in
# the data store.
#
# This would enable creating 'Memory' objects with a different logic for
# pickling that would simply span a MemorizedFunc with the same
# store (or do we want to copy it to avoid cross-talks?), for instance to
# implement HDF5 pickling.

# TODO: Same remark for the logger, and probably use the Python logging
# mechanism.


def extract_first_line(func_code):
    """ Extract the first line information from the function code
        text if available.
    """
    if func_code.startswith(FIRST_LINE_TEXT):
        func_code = func_code.split('\n')
        first_line = int(func_code[0][len(FIRST_LINE_TEXT):])
        func_code = '\n'.join(func_code[1:])
    else:
        first_line = -1
    return func_code, first_line


class JobLibCollisionWarning(UserWarning):
    """ Warn that there might be a collision between names of functions.
    """


_STORE_BACKENDS = {'local': FileSystemStoreBackend}


def register_store_backend(backend_name, backend):
    """Extend available store backends.

    The Memory, MemorizeResult and MemorizeFunc objects are designed to be
    agnostic to the type of store used behind. By default, the local file
    system is used but this function gives the possibility to extend joblib's
    memory pattern with other types of storage such as cloud storage (S3, GCS,
    OpenStack, HadoopFS, etc) or blob DBs.

    Parameters
    ----------
    backend_name: str
        The name identifying the store backend being registered. For example,
        'local' is used with FileSystemStoreBackend.
    backend: StoreBackendBase subclass
        The name of a class that implements the StoreBackendBase interface.

    """
    if not isinstance(backend_name, _basestring):
        raise ValueError("Store backend name should be a string, "
                         "'{0}' given.".format(backend_name))
    if backend is None or not issubclass(backend, StoreBackendBase):
        raise ValueError("Store backend should inherit "
                         "StoreBackendBase, "
                         "'{0}' given.".format(backend))

    _STORE_BACKENDS[backend_name] = backend


def _store_backend_factory(backend, location, verbose=0, backend_options=None):
    """Return the correct store object for the given location."""
    if backend_options is None:
        backend_options = {}

    if (sys.version_info[:2] >= (3, 4) and isinstance(location, pathlib.Path)):
        location = str(location)

    if isinstance(location, StoreBackendBase):
        return location
    elif isinstance(location, _basestring):
        obj = None
        location = os.path.expanduser(location)
        # The location is not a local file system, we look in the
        # registered backends if there's one matching the given backend
        # name.
        for backend_key, backend_obj in _STORE_BACKENDS.items():
            if backend == backend_key:
                obj = backend_obj()

        # By default, we assume the FileSystemStoreBackend can be used if no
        # matching backend could be found.
        if obj is None:
            raise TypeError('Unknown location {0} or backend {1}'.format(
                            location, backend))

        # The store backend is configured with the extra named parameters,
        # some of them are specific to the underlying store backend.
        obj.configure(location, verbose=verbose,
                      backend_options=backend_options)
        return obj
    elif location is not None:
        warnings.warn(
            "Instanciating a backend using a {} as a location is not "
            "supported by joblib. Returning None instead.".format(
                location.__class__.__name__), UserWarning)


    return None


def _get_func_fullname(func):
    """Compute the part of part associated with a function."""
    modules, funcname = get_func_name(func)
    modules.append(funcname)
    return os.path.join(*modules)


def _build_func_identifier(func):
    """Build a roughly unique identifier for the cached function."""
    parts = []
    if isinstance(func, _basestring):
        parts.append(func)
    else:
        parts.append(_get_func_fullname(func))

    # We reuse historical fs-like way of building a function identifier
    return os.path.join(*parts)


def _format_load_msg(func_id, args_id, timestamp=None, metadata=None):
    """ Helper function to format the message when loading the results.
    """
    signature = ""
    try:
        if metadata is not None:
            args = ", ".join(['%s=%s' % (name, value)
                              for name, value
                              in metadata['input_args'].items()])
            signature = "%s(%s)" % (os.path.basename(func_id), args)
        else:
            signature = os.path.basename(func_id)
    except KeyError:
        pass

    if timestamp is not None:
        ts_string = "{0: <16}".format(format_time(time.time() - timestamp))
    else:
        ts_string = ""
    return '[Memory]{0}: Loading {1}'.format(ts_string, str(signature))


# An in-memory store to avoid looking at the disk-based function
# source code to check if a function definition has changed
_FUNCTION_HASHES = weakref.WeakKeyDictionary()


###############################################################################
# class `MemorizedResult`
###############################################################################
class MemorizedResult(Logger):
    """Object representing a cached value.

    Attributes
    ----------
    location: str
        The location of joblib cache. Depends on the store backend used.

    func: function or str
        function whose output is cached. The string case is intended only for
        instanciation based on the output of repr() on another instance.
        (namely eval(repr(memorized_instance)) works).

    argument_hash: str
        hash of the function arguments.

    backend: str
        Type of store backend for reading/writing cache files.
        Default is 'local'.

    mmap_mode: {None, 'r+', 'r', 'w+', 'c'}
        The memmapping mode used when loading from cache numpy arrays. See
        numpy.load for the meaning of the different values.

    verbose: int
        verbosity level (0 means no message).

    timestamp, metadata: string
        for internal use only.
    """
    def __init__(self, location, func, args_id, backend='local',
                 mmap_mode=None, verbose=0, timestamp=None, metadata=None):
        Logger.__init__(self)
        self.func_id = _build_func_identifier(func)
        if isinstance(func, _basestring):
            self.func = func
        else:
            self.func = self.func_id
        self.args_id = args_id
        self.store_backend = _store_backend_factory(backend, location,
                                                    verbose=verbose)
        self.mmap_mode = mmap_mode

        if metadata is not None:
            self.metadata = metadata
        else:
            self.metadata = self.store_backend.get_metadata(
                [self.func_id, self.args_id])

        self.duration = self.metadata.get('duration', None)
        self.verbose = verbose
        self.timestamp = timestamp

    @property
    def argument_hash(self):
        warnings.warn(
            "The 'argument_hash' attribute has been deprecated in version "
            "0.12 and will be removed in version 0.14.\n"
            "Use `args_id` attribute instead.",
            DeprecationWarning, stacklevel=2)
        return self.args_id

    def get(self):
        """Read value from cache and return it."""
        if self.verbose:
            msg = _format_load_msg(self.func_id, self.args_id,
                                   timestamp=self.timestamp,
                                   metadata=self.metadata)
        else:
            msg = None

        try:
            return self.store_backend.load_item(
                [self.func_id, self.args_id], msg=msg, verbose=self.verbose)
        except (ValueError, KeyError) as exc:
            # KeyError is expected under Python 2.7, ValueError under Python 3
            new_exc = KeyError(
                "Error while trying to load a MemorizedResult's value. "
                "It seems that this folder is corrupted : {}".format(
                    os.path.join(
                        self.store_backend.location, self.func_id,
                        self.args_id)
                ))
            new_exc.__cause__ = exc
            raise new_exc

    def clear(self):
        """Clear value from cache"""
        self.store_backend.clear_item([self.func_id, self.args_id])

    def __repr__(self):
        return ('{class_name}(location="{location}", func="{func}", '
                'args_id="{args_id}")'
                .format(class_name=self.__class__.__name__,
                        location=self.store_backend.location,
                        func=self.func,
                        args_id=self.args_id
                        ))

    def __getstate__(self):
        state = self.__dict__.copy()
        state['timestamp'] = None
        return state


class NotMemorizedResult(object):
    """Class representing an arbitrary value.

    This class is a replacement for MemorizedResult when there is no cache.
    """
    __slots__ = ('value', 'valid')

    def __init__(self, value):
        self.value = value
        self.valid = True

    def get(self):
        if self.valid:
            return self.value
        else:
            raise KeyError("No value stored.")

    def clear(self):
        self.valid = False
        self.value = None

    def __repr__(self):
        if self.valid:
            return ('{class_name}({value})'
                    .format(class_name=self.__class__.__name__,
                            value=pformat(self.value)))
        else:
            return self.__class__.__name__ + ' with no value'

    # __getstate__ and __setstate__ are required because of __slots__
    def __getstate__(self):
        return {"valid": self.valid, "value": self.value}

    def __setstate__(self, state):
        self.valid = state["valid"]
        self.value = state["value"]


###############################################################################
# class `NotMemorizedFunc`
###############################################################################
class NotMemorizedFunc(object):
    """No-op object decorating a function.

    This class replaces MemorizedFunc when there is no cache. It provides an
    identical API but does not write anything on disk.

    Attributes
Loading ...