Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ utils / debug.py

# -*- coding: utf-8 -*-
"""
    celery.utils.debug
    ~~~~~~~~~~~~~~~~~~

    Utilities for debugging memory usage.

"""
from __future__ import absolute_import, print_function, unicode_literals

import os

from contextlib import contextmanager
from functools import partial

from celery.five import range
from celery.platforms import signals

try:
    from psutil import Process
except ImportError:
    Process = None  # noqa

__all__ = [
    'blockdetection', 'sample_mem', 'memdump', 'sample',
    'humanbytes', 'mem_rss', 'ps',
]

UNITS = (
    (2 ** 40.0, 'TB'),
    (2 ** 30.0, 'GB'),
    (2 ** 20.0, 'MB'),
    (2 ** 10.0, 'kB'),
    (0.0, '{0!d}b'),
)

_process = None
_mem_sample = []


def _on_blocking(signum, frame):
    import inspect
    raise RuntimeError(
        'Blocking detection timed-out at: {0}'.format(
            inspect.getframeinfo(frame)
        )
    )


@contextmanager
def blockdetection(timeout):
    """A timeout context using ``SIGALRM`` that can be used to detect blocking
    functions."""
    if not timeout:
        yield
    else:
        old_handler = signals['ALRM']
        old_handler = None if old_handler == _on_blocking else old_handler

        signals['ALRM'] = _on_blocking

        try:
            yield signals.arm_alarm(timeout)
        finally:
            if old_handler:
                signals['ALRM'] = old_handler
            signals.reset_alarm()


def sample_mem():
    """Sample RSS memory usage.

    Statistics can then be output by calling :func:`memdump`.

    """
    current_rss = mem_rss()
    _mem_sample.append(current_rss)
    return current_rss


def _memdump(samples=10):
    S = _mem_sample
    prev = list(S) if len(S) <= samples else sample(S, samples)
    _mem_sample[:] = []
    import gc
    gc.collect()
    after_collect = mem_rss()
    return prev, after_collect


def memdump(samples=10, file=None):
    """Dump memory statistics.

    Will print a sample of all RSS memory samples added by
    calling :func:`sample_mem`, and in addition print
    used RSS memory after :func:`gc.collect`.

    """
    say = partial(print, file=file)
    if ps() is None:
        say('- rss: (psutil not installed).')
        return
    prev, after_collect = _memdump(samples)
    if prev:
        say('- rss (sample):')
        for mem in prev:
            say('-    > {0},'.format(mem))
    say('- rss (end): {0}.'.format(after_collect))


def sample(x, n, k=0):
    """Given a list `x` a sample of length ``n`` of that list is returned.

    E.g. if `n` is 10, and `x` has 100 items, a list of every 10th
    item is returned.

    ``k`` can be used as offset.

    """
    j = len(x) // n
    for _ in range(n):
        try:
            yield x[k]
        except IndexError:
            break
        k += j


def hfloat(f, p=5):
    """Convert float to value suitable for humans.

    :keyword p: Float precision.

    """
    i = int(f)
    return i if i == f else '{0:.{p}}'.format(f, p=p)


def humanbytes(s):
    """Convert bytes to human-readable form (e.g. kB, MB)."""
    return next(
        '{0}{1}'.format(hfloat(s / div if div else s), unit)
        for div, unit in UNITS if s >= div
    )


def mem_rss():
    """Return RSS memory usage as a humanized string."""
    p = ps()
    if p is not None:
        return humanbytes(p.get_memory_info().rss)


def ps():
    """Return the global :class:`psutil.Process` instance,
    or :const:`None` if :mod:`psutil` is not installed."""
    global _process
    if _process is None and Process is not None:
        _process = Process(os.getpid())
    return _process