from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import logging
import six
from six.moves import range
from six.moves import zip
logger = logging.getLogger(__name__)
# {{{ timing function
def time():
"""Return elapsed CPU time, as a float, in seconds."""
import os
time_opt = os.environ.get("PYTOOLS_LOG_TIME") or "wall"
if time_opt == "wall":
from time import time
return time()
elif time_opt == "rusage":
from resource import getrusage, RUSAGE_SELF
return getrusage(RUSAGE_SELF).ru_utime
else:
raise RuntimeError("invalid timing method '%s'" % time_opt)
# }}}
# {{{ abstract logging interface
class LogQuantity(object):
"""A source of loggable scalars."""
sort_weight = 0
def __init__(self, name, unit=None, description=None):
self.name = name
self.unit = unit
self.description = description
@property
def default_aggregator(self):
return None
def tick(self):
"""Perform updates required at every :class:`LogManager` tick."""
pass
def __call__(self):
"""Return the current value of the diagnostic represented by this
:class:`LogQuantity` or None if no value is available.
This is only called if the invocation interval calls for it.
"""
raise NotImplementedError
class PostLogQuantity(LogQuantity):
"""A source of loggable scalars."""
sort_weight = 0
def prepare_for_tick(self):
pass
class MultiLogQuantity(object):
"""A source of multiple loggable scalars."""
sort_weight = 0
def __init__(self, names, units=None, descriptions=None):
self.names = names
if units is None:
units = len(names) * [None]
self.units = units
if descriptions is None:
descriptions = len(names) * [None]
self.descriptions = descriptions
@property
def default_aggregators(self):
return [None] * len(self.names)
def tick(self):
"""Perform updates required at every :class:`LogManager` tick."""
pass
def __call__(self):
"""Return an iterable of the current values of the diagnostic represented
by this :class:`MultiLogQuantity`.
This is only called if the invocation interval calls for it.
"""
raise NotImplementedError
class MultiPostLogQuantity(MultiLogQuantity, PostLogQuantity):
pass
class DtConsumer(object):
def __init__(self, dt):
self.dt = dt
def set_dt(self, dt):
self.dt = dt
class TimeTracker(DtConsumer):
def __init__(self, dt):
DtConsumer.__init__(self, dt)
self.t = 0
def tick(self):
self.t += self.dt
class SimulationLogQuantity(PostLogQuantity, DtConsumer):
"""A source of loggable scalars that needs to know the simulation timestep."""
def __init__(self, dt, name, unit=None, description=None):
PostLogQuantity.__init__(self, name, unit, description)
DtConsumer.__init__(self, dt)
class PushLogQuantity(LogQuantity):
def __init__(self, name, unit=None, description=None):
LogQuantity.__init__(self, name, unit, description)
self.value = None
def push_value(self, value):
if self.value is not None:
raise RuntimeError("can't push two values per cycle")
self.value = value
def __call__(self):
v = self.value
self.value = None
return v
class CallableLogQuantityAdapter(LogQuantity):
"""Adapt a 0-ary callable as a :class:`LogQuantity`."""
def __init__(self, callable, name, unit=None, description=None):
self.callable = callable
LogQuantity.__init__(self, name, unit, description)
def __call__(self):
return self.callable()
# }}}
# {{{ manager functionality
class _GatherDescriptor(object):
def __init__(self, quantity, interval):
self.quantity = quantity
self.interval = interval
class _QuantityData(object):
def __init__(self, unit, description, default_aggregator):
self.unit = unit
self.description = description
self.default_aggregator = default_aggregator
def _join_by_first_of_tuple(list_of_iterables):
loi = [i.__iter__() for i in list_of_iterables]
if not loi:
return
key_vals = [next(iter) for iter in loi]
keys = [kv[0] for kv in key_vals]
values = [kv[1] for kv in key_vals]
target_key = max(keys)
force_advance = False
i = 0
while True:
while keys[i] < target_key or force_advance:
try:
new_key, new_value = next(loi[i])
except StopIteration:
return
assert keys[i] < new_key
keys[i] = new_key
values[i] = new_value
if new_key > target_key:
target_key = new_key
force_advance = False
i += 1
if i >= len(loi):
i = 0
if min(keys) == target_key:
yield target_key, values[:]
force_advance = True
def _get_unique_id():
try:
from uuid import uuid1
except ImportError:
try:
import hashlib
checksum = hashlib.md5()
except ImportError:
# for Python << 2.5
import md5
checksum = md5.new()
from random import Random
rng = Random()
rng.seed()
for i in range(20):
checksum.update(str(rng.randrange(1 << 30)).encode('utf-32'))
return checksum.hexdigest()
else:
return uuid1().hex
def _get_random_suffix(n):
characters = (
[chr(65+i) for i in range(26)]
+ [chr(97+i) for i in range(26)]
+ [chr(48+i) for i in range(10)])
from random import choice
return "".join(choice(characters) for i in range(n))
def _set_up_schema(db_conn):
# initialize new database
db_conn.execute("""
create table quantities (
name text,
unit text,
description text,
default_aggregator blob)""")
db_conn.execute("""
create table constants (
name text,
value blob)""")
db_conn.execute("""
create table warnings (
rank integer,
step integer,
message text,
category text,
filename text,
lineno integer
)""")
schema_version = 2
return schema_version
class LogManager(object):
"""A parallel-capable diagnostic time-series logging facility.
It is meant to log data from a computation, with certain log
quantities available before a cycle, and certain other ones
afterwards. A timeline of invocations looks as follows::
tick_before()
compute...
tick()
tick_after()
tick_before()
compute...
tick_after()
...
In a time-dependent simulation, each group of :meth:`tick_before`
:meth:`tick_after` calls captures data for a single time state,
namely that in which the data may have been *before* the "compute"
step. However, some data (such as the length of the timestep taken
in a time-adpative method) may only be available *after* the completion
of the "compute..." stage, which is why :meth:`tick_after` exists.
A :class:`LogManager` logs any number of named time series of floats to
a file. Non-time-series data, in the form of constants, is also
supported and saved.
If MPI parallelism is used, the "head rank" below always refers to
rank 0.
Command line tools called :command:`runalyzer` and :command:`logtool`
(deprecated) are available for looking at the data in a saved log.
"""
def __init__(self, filename=None, mode="r", mpi_comm=None, capture_warnings=True,
commit_interval=90):
"""Initialize this log manager instance.
:param filename: If given, the filename to which this log is bound.
If this database exists, the current state is loaded from it.
:param mode: One of "w", "r" for write, read. "w" assumes that the
database is initially empty. May also be "wu" to indicate that
a unique filename should be chosen automatically.
:arg mpi_comm: A :mod:`mpi4py.MPI.Comm`. If given, logs are
periodically synchronized to the head node, which then writes them
out to disk.
:param capture_warnings: Tap the Python warnings facility and save warnings
to the log file.
:param commit_interval: actually perform a commit only every N times a commit
is requested.
"""
assert isinstance(mode, six.string_types), "mode must be a string"
assert mode in ["w", "r", "wu"], "invalid mode"
self.quantity_data = {}
self.last_values = {}
self.before_gather_descriptors = []
self.after_gather_descriptors = []
self.tick_count = 0
self.commit_interval = commit_interval
self.commit_countdown = commit_interval
self.constants = {}
self.last_save_time = time()
# self-timing
self.start_time = time()
self.t_log = 0
# parallel support
self.head_rank = 0
self.mpi_comm = mpi_comm
self.is_parallel = mpi_comm is not None
if mpi_comm is None:
self.rank = 0
Loading ...