Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / pytools   python

Repository URL to install this package:

/ log.py

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

import logging
import six
from six.moves import range
from six.moves import zip
logger = logging.getLogger(__name__)


# {{{ timing function

def time():
    """Return elapsed CPU time, as a float, in seconds."""
    import os
    time_opt = os.environ.get("PYTOOLS_LOG_TIME") or "wall"
    if time_opt == "wall":
        from time import time
        return time()
    elif time_opt == "rusage":
        from resource import getrusage, RUSAGE_SELF
        return getrusage(RUSAGE_SELF).ru_utime
    else:
        raise RuntimeError("invalid timing method '%s'" % time_opt)

# }}}


# {{{ abstract logging interface

class LogQuantity(object):
    """A source of loggable scalars."""

    sort_weight = 0

    def __init__(self, name, unit=None, description=None):
        self.name = name
        self.unit = unit
        self.description = description

    @property
    def default_aggregator(self):
        return None

    def tick(self):
        """Perform updates required at every :class:`LogManager` tick."""
        pass

    def __call__(self):
        """Return the current value of the diagnostic represented by this
        :class:`LogQuantity` or None if no value is available.

        This is only called if the invocation interval calls for it.
        """
        raise NotImplementedError


class PostLogQuantity(LogQuantity):
    """A source of loggable scalars."""

    sort_weight = 0

    def prepare_for_tick(self):
        pass


class MultiLogQuantity(object):
    """A source of multiple loggable scalars."""

    sort_weight = 0

    def __init__(self, names, units=None, descriptions=None):
        self.names = names

        if units is None:
            units = len(names) * [None]
        self.units = units

        if descriptions is None:
            descriptions = len(names) * [None]
        self.descriptions = descriptions

    @property
    def default_aggregators(self):
        return [None] * len(self.names)

    def tick(self):
        """Perform updates required at every :class:`LogManager` tick."""
        pass

    def __call__(self):
        """Return an iterable of the current values of the diagnostic represented
        by this :class:`MultiLogQuantity`.

        This is only called if the invocation interval calls for it.
        """
        raise NotImplementedError


class MultiPostLogQuantity(MultiLogQuantity, PostLogQuantity):
    pass


class DtConsumer(object):
    def __init__(self, dt):
        self.dt = dt

    def set_dt(self, dt):
        self.dt = dt


class TimeTracker(DtConsumer):
    def __init__(self, dt):
        DtConsumer.__init__(self, dt)
        self.t = 0

    def tick(self):
        self.t += self.dt


class SimulationLogQuantity(PostLogQuantity, DtConsumer):
    """A source of loggable scalars that needs to know the simulation timestep."""

    def __init__(self, dt, name, unit=None, description=None):
        PostLogQuantity.__init__(self, name, unit, description)
        DtConsumer.__init__(self, dt)


class PushLogQuantity(LogQuantity):
    def __init__(self, name, unit=None, description=None):
        LogQuantity.__init__(self, name, unit, description)
        self.value = None

    def push_value(self, value):
        if self.value is not None:
            raise RuntimeError("can't push two values per cycle")
        self.value = value

    def __call__(self):
        v = self.value
        self.value = None
        return v


class CallableLogQuantityAdapter(LogQuantity):
    """Adapt a 0-ary callable as a :class:`LogQuantity`."""
    def __init__(self, callable, name, unit=None, description=None):
        self.callable = callable
        LogQuantity.__init__(self, name, unit, description)

    def __call__(self):
        return self.callable()

# }}}


# {{{ manager functionality

class _GatherDescriptor(object):
    def __init__(self, quantity, interval):
        self.quantity = quantity
        self.interval = interval


class _QuantityData(object):
    def __init__(self, unit, description, default_aggregator):
        self.unit = unit
        self.description = description
        self.default_aggregator = default_aggregator


def _join_by_first_of_tuple(list_of_iterables):
    loi = [i.__iter__() for i in list_of_iterables]
    if not loi:
        return
    key_vals = [next(iter) for iter in loi]
    keys = [kv[0] for kv in key_vals]
    values = [kv[1] for kv in key_vals]
    target_key = max(keys)

    force_advance = False

    i = 0
    while True:
        while keys[i] < target_key or force_advance:
            try:
                new_key, new_value = next(loi[i])
            except StopIteration:
                return
            assert keys[i] < new_key
            keys[i] = new_key
            values[i] = new_value
            if new_key > target_key:
                target_key = new_key

            force_advance = False

        i += 1
        if i >= len(loi):
            i = 0

        if min(keys) == target_key:
            yield target_key, values[:]
            force_advance = True


def _get_unique_id():
    try:
        from uuid import uuid1
    except ImportError:
        try:
            import hashlib
            checksum = hashlib.md5()
        except ImportError:
            # for Python << 2.5
            import md5
            checksum = md5.new()

        from random import Random
        rng = Random()
        rng.seed()
        for i in range(20):
            checksum.update(str(rng.randrange(1 << 30)).encode('utf-32'))
        return checksum.hexdigest()
    else:
        return uuid1().hex


def _get_random_suffix(n):
    characters = (
            [chr(65+i) for i in range(26)]
            + [chr(97+i) for i in range(26)]
            + [chr(48+i) for i in range(10)])

    from random import choice
    return "".join(choice(characters) for i in range(n))


def _set_up_schema(db_conn):
    # initialize new database
    db_conn.execute("""
      create table quantities (
        name text,
        unit text,
        description text,
        default_aggregator blob)""")
    db_conn.execute("""
      create table constants (
        name text,
        value blob)""")
    db_conn.execute("""
      create table warnings (
        rank integer,
        step integer,
        message text,
        category text,
        filename text,
        lineno integer
        )""")

    schema_version = 2
    return schema_version


class LogManager(object):
    """A parallel-capable diagnostic time-series logging facility.
    It is meant to log data from a computation, with certain log
    quantities available before a cycle, and certain other ones
    afterwards. A timeline of invocations looks as follows::

        tick_before()
        compute...
        tick()
        tick_after()

        tick_before()
        compute...
        tick_after()

        ...

    In a time-dependent simulation, each group of :meth:`tick_before`
    :meth:`tick_after` calls captures data for a single time state,
    namely that in which the data may have been *before* the "compute"
    step. However, some data (such as the length of the timestep taken
    in a time-adpative method) may only be available *after* the completion
    of the "compute..." stage, which is why :meth:`tick_after` exists.

    A :class:`LogManager` logs any number of named time series of floats to
    a file. Non-time-series data, in the form of constants, is also
    supported and saved.

    If MPI parallelism is used, the "head rank" below always refers to
    rank 0.

    Command line tools called :command:`runalyzer` and :command:`logtool`
    (deprecated) are available for looking at the data in a saved log.
    """

    def __init__(self, filename=None, mode="r", mpi_comm=None, capture_warnings=True,
            commit_interval=90):
        """Initialize this log manager instance.

        :param filename: If given, the filename to which this log is bound.
          If this database exists, the current state is loaded from it.
        :param mode: One of "w", "r" for write, read. "w" assumes that the
          database is initially empty. May also be "wu" to indicate that
          a unique filename should be chosen automatically.
        :arg mpi_comm: A :mod:`mpi4py.MPI.Comm`. If given, logs are
            periodically synchronized to the head node, which then writes them
            out to disk.
        :param capture_warnings: Tap the Python warnings facility and save warnings
          to the log file.
        :param commit_interval: actually perform a commit only every N times a commit
          is requested.
        """

        assert isinstance(mode, six.string_types), "mode must be a string"
        assert mode in ["w", "r", "wu"], "invalid mode"

        self.quantity_data = {}
        self.last_values = {}
        self.before_gather_descriptors = []
        self.after_gather_descriptors = []
        self.tick_count = 0

        self.commit_interval = commit_interval
        self.commit_countdown = commit_interval

        self.constants = {}

        self.last_save_time = time()

        # self-timing
        self.start_time = time()
        self.t_log = 0

        # parallel support
        self.head_rank = 0
        self.mpi_comm = mpi_comm
        self.is_parallel = mpi_comm is not None

        if mpi_comm is None:
            self.rank = 0
Loading ...