Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lutris / usr / lib / python3 / dist-packages / lutris / command.py
Size: Mime:
"""Threading module, used to launch games while monitoring them."""

# Standard Library
import contextlib
import fcntl
import io
import os
import shlex
import subprocess
import sys
from textwrap import dedent

# Third Party Libraries
from gi.repository import GLib

# Lutris Modules
from lutris import runtime, settings
from lutris.util import system
from lutris.util.log import logger


def get_wrapper_script_location():
    """Return absolute path of lutris-wrapper script"""
    wrapper_relpath = "share/lutris/bin/lutris-wrapper"
    candidates = [
        os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "..")),
        os.path.dirname(os.path.dirname(settings.__file__)),
        "/usr",
        "/usr/local",
    ]
    for candidate in candidates:
        wrapper_abspath = os.path.join(candidate, wrapper_relpath)
        if os.path.isfile(wrapper_abspath):
            return wrapper_abspath
    raise FileNotFoundError("Couldn't find lutris-wrapper script in any of the expected locations")


WRAPPER_SCRIPT = get_wrapper_script_location()


class MonitoredCommand:

    """Exexcutes a commmand while keeping track of its state"""

    fallback_cwd = "/tmp"

    def __init__(
        self,
        command,
        runner=None,
        env=None,
        term=None,
        cwd=None,
        include_processes=None,
        exclude_processes=None,
        log_buffer=None,
        title=None,
    ):  # pylint: disable=too-many-arguments
        self.ready_state = True
        self.env = self.get_environment(env)

        self.command = command
        self.runner = runner
        self.stop_func = lambda: True
        self.game_process = None
        self.prevent_on_stop = False
        self.return_code = None
        self.terminal = system.find_executable(term)
        self.is_running = True
        self.error = None
        self.log_handlers = [
            self.log_handler_stdout,
            self.log_handler_console_output,
        ]
        self.set_log_buffer(log_buffer)
        self.stdout_monitor = None
        self.include_processes = include_processes or []
        self.exclude_processes = exclude_processes or []

        self.cwd = self.get_cwd(cwd)

        self._stdout = io.StringIO()

        self._title = title if title else command[0]

    @property
    def stdout(self):
        return self._stdout.getvalue()

    @property
    def wrapper_command(self):
        """Return launch arguments for the wrapper script"""

        return [
            WRAPPER_SCRIPT,
            self._title,
            str(len(self.include_processes)),
            str(len(self.exclude_processes)),
        ] + self.include_processes + self.exclude_processes + self.command

    def set_log_buffer(self, log_buffer):
        """Attach a TextBuffer to this command enables the buffer handler"""
        if not log_buffer:
            return
        self.log_buffer = log_buffer
        if self.log_handler_buffer not in self.log_handlers:
            self.log_handlers.append(self.log_handler_buffer)

    def get_cwd(self, cwd):
        """Return the current working dir of the game"""
        if not cwd:
            cwd = self.runner.working_dir if self.runner else None
        return os.path.expanduser(cwd or "~")

    @staticmethod
    def get_environment(user_env):
        """Process the user provided environment variables for use as self.env"""
        env = user_env or {}
        # not clear why this needs to be added, the path is already added in
        # the wrappper script.
        env['PYTHONPATH'] = ':'.join(sys.path)
        # Drop bad values of environment keys, those will confuse the Python
        # interpreter.
        return {key: value for key, value in env.items() if "=" not in key}

    def get_child_environment(self):
        """Returns the calculated environment for the child process."""
        env = os.environ.copy()
        env.update(self.env)
        return env

    def start(self):
        """Run the thread."""
        for key, value in self.env.items():
            logger.debug("%s=\"%s\"", key, value)
        logger.debug(" ".join(self.wrapper_command))

        if self.terminal:
            self.game_process = self.run_in_terminal()
        else:
            env = self.get_child_environment()
            self.game_process = self.execute_process(self.wrapper_command, env)

        if not self.game_process:
            logger.error("No game process available")
            return

        GLib.child_watch_add(self.game_process.pid, self.on_stop)

        # make stdout nonblocking.
        fileno = self.game_process.stdout.fileno()
        fcntl.fcntl(fileno, fcntl.F_SETFL, fcntl.fcntl(fileno, fcntl.F_GETFL) | os.O_NONBLOCK)

        self.stdout_monitor = GLib.io_add_watch(
            self.game_process.stdout,
            GLib.IO_IN | GLib.IO_HUP,
            self.on_stdout_output,
        )

    def log_handler_stdout(self, line):
        """Add the line to this command's stdout attribute"""
        self._stdout.write(line)

    def log_handler_buffer(self, line):
        """Add the line to the associated LogBuffer object"""
        self.log_buffer.insert(self.log_buffer.get_end_iter(), line, -1)

    def log_handler_console_output(self, line):  # pylint: disable=no-self-use
        """Print the line to stdout"""
        with contextlib.suppress(BlockingIOError):
            sys.stdout.write(line)
            sys.stdout.flush()

    def on_stop(self, _pid, returncode):
        """Callback registered on game process termination"""
        if self.prevent_on_stop:  # stop() already in progress
            return False

        logger.debug("The process has terminated with code %s", returncode)
        self.is_running = False
        self.return_code = returncode

        resume_stop = self.stop()
        if not resume_stop:
            logger.info("Full shutdown prevented")
            return False

        return False

    def on_stdout_output(self, stdout, condition):
        """Called by the stdout monitor to dispatch output to log handlers"""
        if condition == GLib.IO_HUP:
            self.stdout_monitor = None
            return False
        if not self.is_running:
            return False
        try:
            line = stdout.read(262144).decode("utf-8", errors="ignore")
        except ValueError:
            # file_desc might be closed
            return True
        if "winemenubuilder.exe" in line:
            return True
        for log_handler in self.log_handlers:
            log_handler(line)
        return True

    def run_in_terminal(self):
        """Write command in a script file and run it.

        Running it from a file is likely the only way to set env vars only
        for the command (not for the terminal app).
        It's also the only reliable way to keep the term open when the
        game is quit.
        """
        script_path = os.path.join(settings.CACHE_DIR, "run_in_term.sh")
        exported_environment = "\n".join('export %s="%s" ' % (key, value) for key, value in self.env.items())
        command = " ".join(['"%s"' % token for token in self.wrapper_command])
        with open(script_path, "w") as script_file:
            script_file.write(
                dedent(
                    """#!/bin/sh
                cd "%s"
                %s
                exec %s
                """ % (self.cwd, exported_environment, command)
                )
            )
            os.chmod(script_path, 0o744)
        return self.execute_process([self.terminal, "-e", script_path])

    def execute_process(self, command, env=None):
        """Execute and return a subprocess"""
        if self.cwd and not system.path_exists(self.cwd):
            try:
                os.makedirs(self.cwd)
            except OSError:
                logger.error("Failed to create working directory, falling back to %s", self.fallback_cwd)
                self.cwd = "/tmp"
        try:

            return subprocess.Popen(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                cwd=self.cwd,
                env=env,
            )
        except OSError as ex:
            logger.exception("Failed to execute %s: %s", " ".join(command), ex)
            self.error = ex.strerror

    def stop(self):
        """Stops the current game process and cleans up the instance"""
        # Prevent stop() being called again by the process exiting
        self.prevent_on_stop = True

        try:
            self.game_process.terminate()
        except ProcessLookupError:  # process already dead.
            logger.debug("Management process looks dead already.")

        if hasattr(self, "stop_func"):
            resume_stop = self.stop_func()
            if not resume_stop:
                return False

        if self.stdout_monitor:
            GLib.source_remove(self.stdout_monitor)
            self.stdout_monitor = None

        self.is_running = False
        self.ready_state = False
        return True


def exec_command(command):
    """Execute arbitrary command in a MonitoredCommand

    Used by the --exec command line flag.
    """
    command = MonitoredCommand(shlex.split(command), env=runtime.get_env())
    command.start()
    return command