Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / pytools   python

Repository URL to install this package:

Version: 2020.3.1 

/ persistent_dict.py

"""Generic persistent, concurrent dictionary-like facility."""

from __future__ import division, with_statement, absolute_import

__copyright__ = """
Copyright (C) 2011,2014 Andreas Kloeckner
Copyright (C) 2017 Matt Wala
"""

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

import logging

try:
    import collections.abc as abc
except ImportError:
    # Python 2
    import collections as abc

import os
import shutil
import sys
import errno

import six

logger = logging.getLogger(__name__)

__doc__ = """
Persistent Hashing and Persistent Dictionaries
==============================================

This module contains functionality that allows hashing with keys that remain
valid across interpreter invocations, unlike Python's built-in hashes.

This module also provides a disk-backed dictionary that uses persistent hashing.

.. autoexception:: NoSuchEntryError
.. autoexception:: ReadOnlyEntryError

.. autoexception:: CollisionWarning

.. autoclass:: KeyBuilder
.. autoclass:: PersistentDict
.. autoclass:: WriteOncePersistentDict
"""

try:
    import hashlib
    new_hash = hashlib.sha256
except ImportError:
    # for Python << 2.5
    import sha
    new_hash = sha.new


def _make_dir_recursively(dir_):
    try:
        os.makedirs(dir_)
    except OSError as e:
        from errno import EEXIST
        if e.errno != EEXIST:
            raise


def update_checksum(checksum, obj):
    if isinstance(obj, six.text_type):
        checksum.update(obj.encode("utf8"))
    else:
        checksum.update(obj)


# {{{ cleanup managers

class CleanupBase(object):
    pass


class CleanupManager(CleanupBase):
    def __init__(self):
        self.cleanups = []

    def register(self, c):
        self.cleanups.insert(0, c)

    def clean_up(self):
        for c in self.cleanups:
            c.clean_up()

    def error_clean_up(self):
        for c in self.cleanups:
            c.error_clean_up()


class LockManager(CleanupBase):
    def __init__(self, cleanup_m, lock_file, stacklevel=0):
        self.lock_file = lock_file

        attempts = 0
        while True:
            try:
                self.fd = os.open(self.lock_file,
                        os.O_CREAT | os.O_WRONLY | os.O_EXCL)
                break
            except OSError:
                pass

            from time import sleep
            sleep(1)

            attempts += 1

            if attempts > 10:
                from warnings import warn
                warn("could not obtain lock--delete '%s' if necessary"
                        % self.lock_file,
                        stacklevel=1 + stacklevel)
            if attempts > 3 * 60:
                raise RuntimeError("waited more than three minutes "
                        "on the lock file '%s'"
                        "--something is wrong" % self.lock_file)

        cleanup_m.register(self)

    def clean_up(self):
        os.close(self.fd)
        os.unlink(self.lock_file)

    def error_clean_up(self):
        pass


class ItemDirManager(CleanupBase):
    def __init__(self, cleanup_m, path, delete_on_error):
        from os.path import isdir

        self.existed = isdir(path)
        self.path = path
        self.delete_on_error = delete_on_error

        cleanup_m.register(self)

    def reset(self):
        try:
            shutil.rmtree(self.path)
        except OSError as e:
            if e.errno != errno.ENOENT:
                raise

    def mkdir(self):
        from os import mkdir
        try:
            mkdir(self.path)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise

    def clean_up(self):
        pass

    def error_clean_up(self):
        if self.delete_on_error:
            self.reset()

# }}}


# {{{ key generation

class KeyBuilder(object):
    def rec(self, key_hash, key):
        digest = None

        try:
            digest = key._pytools_persistent_hash_digest  # noqa pylint:disable=protected-access
        except AttributeError:
            pass

        if digest is None:
            try:
                method = key.update_persistent_hash
            except AttributeError:
                pass
            else:
                inner_key_hash = new_hash()
                method(inner_key_hash, self)
                digest = inner_key_hash.digest()

        if digest is None:
            try:
                method = getattr(self, "update_for_"+type(key).__name__)
            except AttributeError:
                pass
            else:
                inner_key_hash = new_hash()
                method(inner_key_hash, key)
                digest = inner_key_hash.digest()

        if digest is None:
            raise TypeError("unsupported type for persistent hash keying: %s"
                    % type(key))

        if not isinstance(key, type):
            try:
                key._pytools_persistent_hash_digest = digest   # noqa pylint:disable=protected-access
            except AttributeError:
                pass
            except TypeError:
                pass

        key_hash.update(digest)

    def __call__(self, key):
        key_hash = new_hash()
        self.rec(key_hash, key)
        return key_hash.hexdigest()

    # {{{ updaters

    @staticmethod
    def update_for_int(key_hash, key):
        key_hash.update(str(key).encode("utf8"))

    update_for_long = update_for_int
    update_for_bool = update_for_int

    @staticmethod
    def update_for_float(key_hash, key):
        key_hash.update(repr(key).encode("utf8"))

    if sys.version_info >= (3,):
        @staticmethod
        def update_for_str(key_hash, key):
            key_hash.update(key.encode('utf8'))

        @staticmethod
        def update_for_bytes(key_hash, key):
            key_hash.update(key)
    else:
        @staticmethod
        def update_for_str(key_hash, key):
            key_hash.update(key)

        @staticmethod
        def update_for_unicode(key_hash, key):
            key_hash.update(key.encode('utf8'))

    def update_for_tuple(self, key_hash, key):
        for obj_i in key:
            self.rec(key_hash, obj_i)

    def update_for_frozenset(self, key_hash, key):
        for set_key in sorted(key):
            self.rec(key_hash, set_key)

    @staticmethod
    def update_for_NoneType(key_hash, key):  # noqa
        del key
        key_hash.update("<None>".encode('utf8'))

    @staticmethod
    def update_for_dtype(key_hash, key):
        key_hash.update(key.str.encode('utf8'))

    # }}}

# }}}


# {{{ lru cache

class _LinkedList(object):
    """The list operates on nodes of the form [value, leftptr, rightpr]. To create a
    node of this form you can use `LinkedList.new_node().`

    Supports inserting at the left and deleting from an arbitrary location.
    """
    def __init__(self):
        self.count = 0
        self.head = None
        self.end = None

    @staticmethod
    def new_node(element):
        return [element, None, None]

    def __len__(self):
        return self.count

    def appendleft_node(self, node):
        self.count += 1

        if self.head is None:
            self.head = self.end = node
            return

        self.head[1] = node
        node[2] = self.head

        self.head = node

    def pop_node(self):
        end = self.end
        self.remove_node(end)
        return end

    def remove_node(self, node):
        self.count -= 1

        if self.head is self.end:
            assert node is self.head
            self.head = self.end = None
            return

        left = node[1]
        right = node[2]

        if left is None:
            self.head = right
        else:
            left[2] = right

        if right is None:
            self.end = left
        else:
            right[1] = left
Loading ...