Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pantsbuild.pants / pantsd / process_manager.py
Size: Mime:
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import functools
import logging
import os
import signal
import subprocess
import time
import traceback
from contextlib import contextmanager
from typing import Optional

import psutil

from pants.base.build_environment import get_buildroot
from pants.process.lock import OwnerPrintingInterProcessFileLock
from pants.process.subprocess import Subprocess
from pants.util.dirutil import read_file, rm_rf, safe_file_dump, safe_mkdir
from pants.util.memo import memoized_property

logger = logging.getLogger(__name__)


@contextmanager
def swallow_psutil_exceptions():
    """A contextmanager that swallows standard psutil access exceptions."""
    try:
        yield
    except (psutil.AccessDenied, psutil.NoSuchProcess):
        # This masks common, but usually benign psutil process access exceptions that might be seen
        # when accessing attributes/methods on psutil.Process objects.
        pass


class ProcessGroup:
    """Wraps a logical group of processes and provides convenient access to ProcessManager
    objects."""

    def __init__(self, name, metadata_base_dir=None):
        self._name = name
        self._metadata_base_dir = metadata_base_dir

    def _instance_from_process(self, process):
        """Default converter from psutil.Process to process instance classes for subclassing."""
        return ProcessManager(
            name=process.name(),
            pid=process.pid,
            process_name=process.name(),
            metadata_base_dir=self._metadata_base_dir,
        )

    def iter_processes(self, proc_filter=None):
        """Yields processes from psutil.process_iter with an optional filter and swallows psutil
        errors.

        If a psutil exception is raised during execution of the filter, that process will not be
        yielded but subsequent processes will. On the other hand, if psutil.process_iter raises an
        exception, no more processes will be yielded.
        """
        with swallow_psutil_exceptions():  # process_iter may raise
            for proc in psutil.process_iter():
                with swallow_psutil_exceptions():  # proc_filter may raise
                    if (proc_filter is None) or proc_filter(proc):
                        yield proc

    def iter_instances(self, *args, **kwargs):
        for item in self.iter_processes(*args, **kwargs):
            yield self._instance_from_process(item)


class ProcessMetadataManager:
    """"Manages contextual, on-disk process metadata."""

    class MetadataError(Exception):
        pass

    class Timeout(Exception):
        pass

    FAIL_WAIT_SEC = 10
    INFO_INTERVAL_SEC = 5
    WAIT_INTERVAL_SEC = 0.1

    def __init__(self, metadata_base_dir=None):
        """
        :param str metadata_base_dir: The base directory for process metadata.
        """
        super().__init__()

        self._metadata_base_dir = (
            metadata_base_dir or Subprocess.Factory.global_instance().create().get_subprocess_dir()
        )

    @staticmethod
    def _maybe_cast(item, caster):
        """Given a casting function, attempt to cast to that type while masking common cast
        exceptions.

        N.B. This is mostly suitable for casting string types to numeric types - e.g. a port number
        read from disk into an int.

        :param func caster: A casting callable (e.g. `int`).
        :returns: The result of caster(item) or item if TypeError or ValueError are raised during cast.
        """
        try:
            return caster(item)
        except (TypeError, ValueError):
            # N.B. the TypeError catch here (already) protects against the case that caster is None.
            return item

    @classmethod
    def _deadline_until(
        cls,
        closure,
        action_msg,
        timeout=FAIL_WAIT_SEC,
        wait_interval=WAIT_INTERVAL_SEC,
        info_interval=INFO_INTERVAL_SEC,
    ):
        """Execute a function/closure repeatedly until a True condition or timeout is met.

        :param func closure: the function/closure to execute (should not block for long periods of time
                             and must return True on success).
        :param str action_msg: a description of the action that is being executed, to be rendered as
                               info while we wait, and as part of any rendered exception.
        :param float timeout: the maximum amount of time to wait for a true result from the closure in
                              seconds. N.B. this is timing based, so won't be exact if the runtime of
                              the closure exceeds the timeout.
        :param float wait_interval: the amount of time to sleep between closure invocations.
        :param float info_interval: the amount of time to wait before and between reports via info
                                    logging that we're still waiting for the closure to succeed.
        :raises: :class:`ProcessManager.Timeout` on execution timeout.
        """
        now = time.time()
        deadline = now + timeout
        info_deadline = now + info_interval
        while 1:
            if closure():
                return True

            now = time.time()
            if now > deadline:
                raise cls.Timeout(
                    "exceeded timeout of {} seconds while waiting for {}".format(
                        timeout, action_msg
                    )
                )

            if now > info_deadline:
                logger.info("waiting for {}...".format(action_msg))
                info_deadline = info_deadline + info_interval
            elif wait_interval:
                time.sleep(wait_interval)

    @classmethod
    def _wait_for_file(cls, filename, timeout=FAIL_WAIT_SEC, want_content=True):
        """Wait up to timeout seconds for filename to appear with a non-zero size or raise
        Timeout()."""

        def file_waiter():
            return os.path.exists(filename) and (not want_content or os.path.getsize(filename))

        action_msg = "file {} to appear".format(filename)
        return cls._deadline_until(file_waiter, action_msg, timeout=timeout)

    @staticmethod
    def _get_metadata_dir_by_name(name, metadata_base_dir):
        """Retrieve the metadata dir by name.

        This should always live outside of the workdir to survive a clean-all.
        """
        return os.path.join(metadata_base_dir, name)

    def _maybe_init_metadata_dir_by_name(self, name):
        """Initialize the metadata directory for a named identity if it doesn't exist."""
        safe_mkdir(self.__class__._get_metadata_dir_by_name(name, self._metadata_base_dir))

    def _metadata_file_path(self, name, metadata_key):
        return self.metadata_file_path(name, metadata_key, self._metadata_base_dir)

    @classmethod
    def metadata_file_path(cls, name, metadata_key, metadata_base_dir):
        return os.path.join(cls._get_metadata_dir_by_name(name, metadata_base_dir), metadata_key)

    def read_metadata_by_name(self, name, metadata_key, caster=None):
        """Read process metadata using a named identity.

        :param string name: The ProcessMetadataManager identity/name (e.g. 'pantsd').
        :param string metadata_key: The metadata key (e.g. 'pid').
        :param func caster: A casting callable to apply to the read value (e.g. `int`).
        """
        file_path = self._metadata_file_path(name, metadata_key)
        try:
            metadata = read_file(file_path).strip()
            return self._maybe_cast(metadata, caster)
        except (IOError, OSError):
            return None

    def write_metadata_by_name(self, name, metadata_key, metadata_value):
        """Write process metadata using a named identity.

        :param string name: The ProcessMetadataManager identity/name (e.g. 'pantsd').
        :param string metadata_key: The metadata key (e.g. 'pid').
        :param string metadata_value: The metadata value (e.g. '1729').
        """
        self._maybe_init_metadata_dir_by_name(name)
        file_path = self._metadata_file_path(name, metadata_key)
        safe_file_dump(file_path, metadata_value)

    def await_metadata_by_name(self, name, metadata_key, timeout, caster=None):
        """Block up to a timeout for process metadata to arrive on disk.

        :param string name: The ProcessMetadataManager identity/name (e.g. 'pantsd').
        :param string metadata_key: The metadata key (e.g. 'pid').
        :param int timeout: The deadline to write metadata.
        :param type caster: A type-casting callable to apply to the read value (e.g. int, str).
        :returns: The value of the metadata key (read from disk post-write).
        :raises: :class:`ProcessMetadataManager.Timeout` on timeout.
        """
        file_path = self._metadata_file_path(name, metadata_key)
        self._wait_for_file(file_path, timeout=timeout)
        return self.read_metadata_by_name(name, metadata_key, caster)

    def purge_metadata_by_name(self, name):
        """Purge a processes metadata directory.

        :raises: `ProcessManager.MetadataError` when OSError is encountered on metadata dir removal.
        """
        meta_dir = self._get_metadata_dir_by_name(name, self._metadata_base_dir)
        logger.debug("purging metadata directory: {}".format(meta_dir))
        try:
            rm_rf(meta_dir)
        except OSError as e:
            raise ProcessMetadataManager.MetadataError(
                "failed to purge metadata directory {}: {!r}".format(meta_dir, e)
            )


class ProcessManager(ProcessMetadataManager):
    """Subprocess/daemon management mixin/superclass.

    Not intended to be thread-safe.
    """

    class InvalidCommandOutput(Exception):
        pass

    class NonResponsiveProcess(Exception):
        pass

    class ExecutionError(Exception):
        def __init__(self, message, output=None):
            super(ProcessManager.ExecutionError, self).__init__(message)
            self.message = message
            self.output = output

        def __repr__(self):
            return "{}(message={!r}, output={!r})".format(
                type(self).__name__, self.message, self.output
            )

    KILL_WAIT_SEC = 5
    KILL_CHAIN = (signal.SIGTERM, signal.SIGKILL)

    def __init__(
        self,
        name,
        pid=None,
        socket=None,
        process_name=None,
        socket_type=int,
        metadata_base_dir=None,
    ):
        """
        :param string name: The process identity/name (e.g. 'pantsd' or 'ng_Zinc').
        :param int pid: The process pid. Overrides fetching of the self.pid @property.
        :param string socket: The socket metadata. Overrides fetching of the self.socket @property.
        :param string process_name: The process name for cmdline executable name matching.
        :param type socket_type: The type to be used for socket type casting (e.g. int).
        :param str metadata_base_dir: The overridden base directory for process metadata.
        """
        super().__init__(metadata_base_dir)
        self._name = name.lower().strip()
        self._pid = pid
        self._socket = socket
        self._socket_type = socket_type
        self._process_name = process_name
        self._buildroot = get_buildroot()
        self._process = None

    @property
    def name(self):
        """The logical name/label of the process."""
        return self._name

    @property
    def process_name(self):
        """The logical process name.

        If defined, this is compared to exe_name for stale pid checking.
        """
        return self._process_name

    @memoized_property
    def lifecycle_lock(self):
        """An identity-keyed inter-process lock for safeguarding lifecycle and other operations."""
        safe_mkdir(self._metadata_base_dir)
        return OwnerPrintingInterProcessFileLock(
            # N.B. This lock can't key into the actual named metadata dir (e.g. `.pids/pantsd/lock`
            # via `ProcessMetadataManager._get_metadata_dir_by_name()`) because of a need to purge
            # the named metadata dir on startup to avoid stale metadata reads.
            os.path.join(self._metadata_base_dir, ".lock.{}".format(self._name))
        )

    @property
    def cmdline(self):
        """The process commandline. e.g. ['/usr/bin/python2.7', 'pants.pex'].

        :returns: The command line or else `None` if the underlying process has died.
        """
        with swallow_psutil_exceptions():
            process = self._as_process()
            if process:
                return process.cmdline()
        return None

    @property
    def cmd(self):
        """The first element of the process commandline e.g. '/usr/bin/python2.7'.

        :returns: The first element of the process command line or else `None` if the underlying
                  process has died.
        """
        return (self.cmdline or [None])[0]

    @property
    def pid(self):
        """The running processes pid (or None)."""
        return self._pid or self.read_metadata_by_name(self._name, "pid", int)

    @property
    def socket(self):
        """The running processes socket/port information (or None)."""
        return self._socket or self.read_metadata_by_name(self._name, "socket", self._socket_type)

    @classmethod
    def get_subprocess_output(cls, command, ignore_stderr=True, **kwargs):
        """Get the output of an executed command.

        :param command: An iterable representing the command to execute (e.g. ['ls', '-al']).
        :param ignore_stderr: Whether or not to ignore stderr output vs interleave it with stdout.
        :raises: `ProcessManager.ExecutionError` on `OSError` or `CalledProcessError`.
        :returns: The output of the command.
        """
        if ignore_stderr is False:
            kwargs.setdefault("stderr", subprocess.STDOUT)

        try:
            return subprocess.check_output(command, **kwargs).decode().strip()
        except (OSError, subprocess.CalledProcessError) as e:
            subprocess_output = getattr(e, "output", "").strip()
            raise cls.ExecutionError(str(e), subprocess_output)

    def await_pid(self, timeout):
        """Wait up to a given timeout for a process to write pid metadata."""
        return self.await_metadata_by_name(self._name, "pid", timeout, int)

    def await_socket(self, timeout):
        """Wait up to a given timeout for a process to write socket info."""
        return self.await_metadata_by_name(self._name, "socket", timeout, self._socket_type)

    def write_pid(self, pid=None):
        """Write the current processes PID to the pidfile location."""
        pid = pid or os.getpid()
        self.write_metadata_by_name(self._name, "pid", str(pid))

    def write_socket(self, socket_info):
        """Write the local processes socket information (TCP port or UNIX socket)."""
        self.write_metadata_by_name(self._name, "socket", str(socket_info))

    def write_named_socket(self, socket_name, socket_info):
        """A multi-tenant, named alternative to ProcessManager.write_socket()."""
        self.write_metadata_by_name(self._name, "socket_{}".format(socket_name), str(socket_info))

    def read_named_socket(self, socket_name, socket_type):
        """A multi-tenant, named alternative to ProcessManager.socket."""
        return self.read_metadata_by_name(self._name, "socket_{}".format(socket_name), socket_type)

    def _as_process(self):
        """Returns a psutil `Process` object wrapping our pid.

        NB: Even with a process object in hand, subsequent method calls against it can always raise
        `NoSuchProcess`.  Care is needed to document the raises in the public API or else trap them and
        do something sensible for the API.

        :returns: a psutil Process object or else None if we have no pid.
        :rtype: :class:`psutil.Process`
        :raises: :class:`psutil.NoSuchProcess` if the process identified by our pid has died.
        """
        if self._process is None and self.pid:
            self._process = psutil.Process(self.pid)
        return self._process

    def is_dead(self):
        """Return a boolean indicating whether the process is dead or not."""
        return not self.is_alive()

    def is_alive(self, extended_check=None):
        """Return a boolean indicating whether the process is running or not.

        :param func extended_check: An additional callable that will be invoked to perform an extended
                                    liveness check. This callable should take a single argument of a
                                    `psutil.Process` instance representing the context-local process
                                    and return a boolean True/False to indicate alive vs not alive.
        """
        try:
            process = self._as_process()
            return not (
                # Can happen if we don't find our pid.
                (not process)
                or
                # Check for walkers.
                (process.status() == psutil.STATUS_ZOMBIE)
                or
                # Check for stale pids.
                (self.process_name and self.process_name != process.name())
                or
                # Extended checking.
                (extended_check and not extended_check(process))
            )
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            # On some platforms, accessing attributes of a zombie'd Process results in NoSuchProcess.
            return False

    def purge_metadata(self, force=False):
        """Instance-based version of ProcessMetadataManager.purge_metadata_by_name() that checks for
        process liveness before purging metadata.

        :param bool force: If True, skip process liveness check before purging metadata.
        :raises: `ProcessManager.MetadataError` when OSError is encountered on metadata dir removal.
        """
        if not force and self.is_alive():
            raise ProcessMetadataManager.MetadataError(
                "cannot purge metadata for a running process!"
            )

        super().purge_metadata_by_name(self._name)

    def _kill(self, kill_sig):
        """Send a signal to the current process."""
        if self.pid:
            os.kill(self.pid, kill_sig)

    def terminate(self, signal_chain=KILL_CHAIN, kill_wait=KILL_WAIT_SEC, purge=True):
        """Ensure a process is terminated by sending a chain of kill signals (SIGTERM, SIGKILL)."""
        alive = self.is_alive()
        if alive:
            logger.debug("terminating {}".format(self._name))
            for signal_type in signal_chain:
                pid = self.pid
                try:
                    logger.debug("sending signal {} to pid {}".format(signal_type, pid))
                    self._kill(signal_type)
                except OSError as e:
                    logger.warning(
                        "caught OSError({e!s}) during attempt to kill -{signal} {pid}!".format(
                            e=e, signal=signal_type, pid=pid
                        )
                    )

                # Wait up to kill_wait seconds to terminate or move onto the next signal.
                try:
                    if self._deadline_until(self.is_dead, "daemon to exit", timeout=kill_wait):
                        alive = False
                        logger.debug("successfully terminated pid {}".format(pid))
                        break
                except self.Timeout:
                    # Loop to the next kill signal on timeout.
                    pass

        if alive:
            raise ProcessManager.NonResponsiveProcess(
                "failed to kill pid {pid} with signals {chain}".format(
                    pid=self.pid, chain=signal_chain
                )
            )

        if purge:
            self.purge_metadata(force=True)

    def daemonize(
        self,
        pre_fork_opts=None,
        post_fork_parent_opts=None,
        post_fork_child_opts=None,
        fork_context=None,
        write_pid=True,
    ):
        """Perform a double-fork, execute callbacks and write the child pid file.

        The double-fork here is necessary to truly daemonize the subprocess such that it can never
        take control of a tty. The initial fork and setsid() creates a new, isolated process group
        and also makes the first child a session leader (which can still acquire a tty). By forking a
        second time, we ensure that the second child can never acquire a controlling terminal because
        it's no longer a session leader - but it now has its own separate process group.

        Additionally, a normal daemon implementation would typically perform an os.umask(0) to reset
        the processes file mode creation mask post-fork. We do not do this here (and in daemon_spawn
        below) due to the fact that the daemons that pants would run are typically personal user
        daemons. Having a disparate umask from pre-vs-post fork causes files written in each phase to
        differ in their permissions without good reason - in this case, we want to inherit the umask.

        :param fork_context: A function which accepts and calls a function that will call fork. This
          is not a contextmanager/generator because that would make interacting with native code more
          challenging. If no fork_context is passed, the fork function is called directly.
        """

        def double_fork():
            logger.debug("forking %s", self)
            pid = os.fork()
            if pid == 0:
                os.setsid()
                second_pid = os.fork()
                if second_pid == 0:
                    return False, True
                else:
                    if write_pid:
                        self.write_pid(second_pid)
                    return False, False
            else:
                # This prevents un-reaped, throw-away parent processes from lingering in the process table.
                os.waitpid(pid, 0)
                return True, False

        fork_func = functools.partial(fork_context, double_fork) if fork_context else double_fork

        # Perform the double fork (optionally under the fork_context). Three outcomes are possible after
        # the double fork: we're either the original parent process, the middle double-fork process, or
        # the child. We assert below that a process is not somehow both the parent and the child.
        self.purge_metadata()
        self.pre_fork(**pre_fork_opts or {})
        is_parent, is_child = fork_func()

        try:
            if not is_parent and not is_child:
                # Middle process.
                os._exit(0)
            elif is_parent:
                assert not is_child
                self.post_fork_parent(**post_fork_parent_opts or {})
            else:
                assert not is_parent
                os.chdir(self._buildroot)
                self.post_fork_child(**post_fork_child_opts or {})
        except Exception:
            logger.critical(traceback.format_exc())
            os._exit(0)

    def daemon_spawn(
        self, pre_fork_opts=None, post_fork_parent_opts=None, post_fork_child_opts=None
    ):
        """Perform a single-fork to run a subprocess and write the child pid file.

        Use this if your post_fork_child block invokes a subprocess via subprocess.Popen(). In this
        case, a second fork such as used in daemonize() is extraneous given that Popen() also forks.
        Using this daemonization method vs daemonize() leaves the responsibility of writing the pid
        to the caller to allow for library-agnostic flexibility in subprocess execution.
        """
        self.purge_metadata()
        self.pre_fork(**pre_fork_opts or {})
        pid = os.fork()
        if pid == 0:
            # fork's child execution
            try:
                os.setsid()
                os.chdir(self._buildroot)
                self.post_fork_child(**post_fork_child_opts or {})
            except Exception:
                logger.critical(traceback.format_exc())
            finally:
                os._exit(0)
        else:
            # fork's parent execution
            try:
                self.post_fork_parent(**post_fork_parent_opts or {})
            except Exception:
                logger.critical(traceback.format_exc())

    def pre_fork(self):
        """Pre-fork callback for subclasses."""

    def post_fork_child(self):
        """Pre-fork child callback for subclasses."""

    def post_fork_parent(self):
        """Post-fork parent callback for subclasses."""


class FingerprintedProcessManager(ProcessManager):
    """A `ProcessManager` subclass that provides a general strategy for process fingerprinting."""

    FINGERPRINT_KEY = "fingerprint"
    FINGERPRINT_CMD_KEY: Optional[str] = None
    FINGERPRINT_CMD_SEP = "="

    @property
    def fingerprint(self):
        """The fingerprint of the current process.

        This can either read the current fingerprint from the running process's psutil.Process.cmdline
        (if the managed process supports that) or from the `ProcessManager` metadata.

        :returns: The fingerprint of the running process as read from the process table, ProcessManager
                  metadata or `None`.
        :rtype: string
        """
        return self.parse_fingerprint(self.cmdline) or self.read_metadata_by_name(
            self.name, self.FINGERPRINT_KEY
        )

    def parse_fingerprint(self, cmdline, key=None, sep=None):
        """Given a psutil.Process.cmdline, parse and return a fingerprint.

        :param list cmdline: The psutil.Process.cmdline of the current process.
        :param string key: The key for fingerprint discovery.
        :param string sep: The key/value separator for fingerprint discovery.
        :returns: The parsed fingerprint or `None`.
        :rtype: string or `None`
        """
        key = key or self.FINGERPRINT_CMD_KEY
        if key:
            sep = sep or self.FINGERPRINT_CMD_SEP
            cmdline = cmdline or []
            for cmd_part in cmdline:
                if cmd_part.startswith("{}{}".format(key, sep)):
                    return cmd_part.split(sep)[1]

    def has_current_fingerprint(self, fingerprint):
        """Determines if a new fingerprint is the current fingerprint of the running process.

        :param string fingerprint: The new fingerprint to compare to.
        :rtype: bool
        """
        return fingerprint == self.fingerprint

    def needs_restart(self, fingerprint):
        """Determines if the current ProcessManager needs to be started or restarted.

        :param string fingerprint: The new fingerprint to compare to.
        :rtype: bool
        """
        return self.is_dead() or not self.has_current_fingerprint(fingerprint)