Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lutris / usr / lib / python3 / dist-packages / lutris / installer / commands.py
Size: Mime:
"""Commands for installer scripts"""
# Standard Library
import glob
import json
import multiprocessing
import os
import shlex
import shutil
import stat
from gettext import gettext as _

# Third Party Libraries
from gi.repository import GLib

# Lutris Modules
from lutris import runtime
from lutris.cache import get_cache_path
from lutris.command import MonitoredCommand
from lutris.installer.errors import ScriptingError
from lutris.runners import import_task
from lutris.util import disks, extract, selective_merge, system
from lutris.util.fileio import EvilConfigParser, MultiOrderedDict
from lutris.util.log import logger
from lutris.util.wine.wine import WINE_DEFAULT_ARCH, get_wine_version_exe


class CommandsMixin:

    """The directives for the `installer:` part of the install script."""

    def __init__(self):
        if isinstance(self, CommandsMixin):
            raise RuntimeError("This class is a mixin")

    def _get_runner_version(self):
        """Return the version of the runner used for the installer"""
        if self.runner in ("wine", "winesteam") and self.script.get(self.runner):
            return self.script[self.runner].get("version")
        if self.runner == "libretro":
            return self.script["game"]["core"]
        return None

    @staticmethod
    def _check_required_params(params, command_data, command_name):
        """Verify presence of a list of parameters required by a command."""
        if isinstance(params, str):
            params = [params]
        for param in params:
            if isinstance(param, tuple):
                param_present = False
                for key in param:
                    if key in command_data:
                        param_present = True
                if not param_present:
                    raise ScriptingError(
                        "One of %s parameter is mandatory for the %s command" % (" or ".join(param), command_name),
                        command_data,
                    )
            else:
                if param not in command_data:
                    raise ScriptingError(
                        "The %s parameter is mandatory for the %s command" % (param, command_name),
                        command_data,
                    )

    @staticmethod
    def _is_cached_file(file_path):
        """Return whether a file referenced by file_id is stored in the cache"""
        pga_cache_path = get_cache_path()
        if not pga_cache_path:
            return False
        return file_path.startswith(pga_cache_path)

    def chmodx(self, filename):
        """Make filename executable"""
        filename = self._substitute(filename)
        file_stats = os.stat(filename)
        os.chmod(filename, file_stats.st_mode | stat.S_IEXEC)

    def execute(self, data):
        """Run an executable file."""
        args = []
        terminal = None
        working_dir = None
        env = {}
        if isinstance(data, dict):
            self._check_required_params([("file", "command")], data, "execute")
            if "command" in data and "file" in data:
                raise ScriptingError(
                    "Parameters file and command can't be used "
                    "at the same time for the execute command",
                    data,
                )
            file_ref = data.get("file", "")
            command = data.get("command", "")
            args_string = data.get("args", "")
            for arg in shlex.split(args_string):
                args.append(self._substitute(arg))
            terminal = data.get("terminal")
            working_dir = data.get("working_dir")
            if not data.get("disable_runtime"):
                # Possibly need to handle prefer_system_libs here
                env.update(runtime.get_env())

            # Loading environment variables set in the script
            env.update(self.script_env)

            # Environment variables can also be passed to the execute command
            local_env = data.get("env") or {}
            env.update({key: self._substitute(value) for key, value in local_env.items()})
            include_processes = shlex.split(data.get("include_processes", ""))
            exclude_processes = shlex.split(data.get("exclude_processes", ""))
        elif isinstance(data, str):
            command = data
            include_processes = []
            exclude_processes = []
        else:
            raise ScriptingError("No parameters supplied to execute command.", data)

        if command:
            file_ref = "bash"
            args = ["-c", self._get_file(command.strip())]
            include_processes.append("bash")
        else:
            # Determine whether 'file' value is a file id or a path
            file_ref = self._get_file(file_ref)

        exec_path = system.find_executable(file_ref)
        if not exec_path:
            raise ScriptingError("Unable to find executable %s" % file_ref)
        if not os.access(exec_path, os.X_OK):
            logger.warning("Making %s executable", exec_path)
            self.chmodx(exec_path)

        if terminal:
            terminal = system.get_default_terminal()

        if not working_dir or not os.path.exists(working_dir):
            working_dir = self.target_path

        command = MonitoredCommand(
            [exec_path] + args,
            env=env,
            term=terminal,
            cwd=working_dir,
            include_processes=include_processes,
            exclude_processes=exclude_processes,
        )
        command.start()
        GLib.idle_add(self.parent.attach_logger, command)
        self.heartbeat = GLib.timeout_add(1000, self._monitor_task, command)
        return "STOP"

    def extract(self, data):
        """Extract a file, guessing the compression method."""
        self._check_required_params([("file", "src")], data, "extract")
        src_param = data.get("file") or data.get("src")
        filespec = self._get_file(src_param)

        filenames = glob.glob(filespec)

        if not filenames:
            raise ScriptingError("%s does not exist" % filespec)
        if "dst" in data:
            dest_path = self._substitute(data["dst"])
        else:
            dest_path = self.target_path
        for filename in filenames:
            msg = _("Extracting %s") % os.path.basename(filename)
            logger.debug(msg)
            GLib.idle_add(self.parent.set_status, msg)
            merge_single = "nomerge" not in data
            extractor = data.get("format")
            logger.debug("extracting file %s to %s", filename, dest_path)
            self._killable_process(extract.extract_archive, filename, dest_path, merge_single, extractor)

    def input_menu(self, data):
        """Display an input request as a dropdown menu with options."""
        self._check_required_params("options", data, "input_menu")
        identifier = data.get("id")
        alias = "INPUT_%s" % identifier if identifier else None
        has_entry = data.get("entry")
        options = data["options"]
        preselect = self._substitute(data.get("preselect", ""))
        GLib.idle_add(
            self.parent.input_menu,
            alias,
            options,
            preselect,
            has_entry,
            self._on_input_menu_validated,
        )
        return "STOP"

    def _on_input_menu_validated(self, _widget, *args):
        alias = args[0]
        menu = args[1]
        choosen_option = menu.get_active_id()
        if choosen_option:
            self.user_inputs.append({"alias": alias, "value": choosen_option})
            GLib.idle_add(self.parent.continue_button.hide)
            self._iter_commands()

    def insert_disc(self, data):
        """Request user to insert an optical disc"""
        self._check_required_params("requires", data, "insert_disc")
        requires = data.get("requires")
        message = data.get(
            "message",
            _("Insert or mount game disc and click Autodetect or\n"
              "use Browse if the disc is mounted on a non standard location."),
        )
        message += (
            _("\n\nLutris is looking for a mounted disk drive or image \n"
              "containing the following file or folder:\n"
              "<i>%s</i>") % requires
        )
        if self.runner == "wine":
            GLib.idle_add(self.parent.eject_button.show)
        GLib.idle_add(self.parent.ask_for_disc, message, self._find_matching_disc, requires)
        return "STOP"

    def _find_matching_disc(self, _widget, requires, extra_path=None):
        if extra_path:
            drives = [extra_path]
        else:
            drives = disks.get_mounted_discs()
        for drive in drives:
            required_abspath = os.path.join(drive, requires)
            required_abspath = system.fix_path_case(required_abspath)
            if required_abspath:
                logger.debug("Found %s on cdrom %s", requires, drive)
                self.game_disc = drive
                self._iter_commands()
                break

    def mkdir(self, directory):
        """Create directory"""
        directory = self._substitute(directory)
        try:
            os.makedirs(directory)
        except OSError:
            logger.debug("Directory %s already exists", directory)
        else:
            logger.debug("Created directory %s", directory)

    def merge(self, params):
        """Merge the contents given by src to destination folder dst"""
        self._check_required_params(["src", "dst"], params, "merge")
        src, dst = self._get_move_paths(params)
        logger.debug("Merging %s into %s", src, dst)
        if not os.path.exists(src):
            raise ScriptingError("Source does not exist: %s" % src, params)
        if not os.path.exists(dst):
            os.makedirs(dst)
        if os.path.isfile(src):
            # If single file, copy it and change reference in game file so it
            # can be used as executable. Skip copying if the source is the same
            # as destination.
            if os.path.dirname(src) != dst:
                self._killable_process(shutil.copy, src, dst)
            if params["src"] in self.game_files.keys():
                self.game_files[params["src"]] = os.path.join(dst, os.path.basename(src))
            return
        self._killable_process(system.merge_folders, src, dst)

    def copy(self, params):
        """Alias for merge"""
        self.merge(params)

    def move(self, params):
        """Move a file or directory into a destination folder."""
        self._check_required_params(["src", "dst"], params, "move")
        src, dst = self._get_move_paths(params)
        logger.debug("Moving %s to %s", src, dst)
        if not os.path.exists(src):
            raise ScriptingError("I can't move %s, it does not exist" % src)

        if os.path.isfile(src):
            if os.path.dirname(src) == dst:
                logger.info("Source file is the same as destination, skipping")
                return

            if os.path.exists(os.path.join(dst, os.path.basename(src))):
                # May not be the best choice, but it's the safest.
                # Maybe should display confirmation dialog (Overwrite / Skip) ?
                logger.info("Destination file exists, skipping")
                return
        try:
            if self._is_cached_file(src):
                action = shutil.copy
            else:
                action = shutil.move
            self._killable_process(action, src, dst)
        except shutil.Error:
            raise ScriptingError("Can't move %s \nto destination %s" % (src, dst))

    def rename(self, params):
        """Rename file or folder."""
        self._check_required_params(["src", "dst"], params, "rename")
        src, dst = self._get_move_paths(params)
        if not os.path.exists(src):
            raise ScriptingError("Rename error, source path does not exist: %s" % src)
        if os.path.isdir(dst):
            try:
                os.rmdir(dst)  # Remove if empty
            except OSError:
                pass
        if os.path.exists(dst):
            raise ScriptingError("Rename error, destination already exists: %s" % src)
        dst_dir = os.path.dirname(dst)

        # Pre-move on dest filesystem to avoid error with
        # os.rename through different filesystems
        temp_dir = os.path.join(dst_dir, "lutris_rename_temp")
        os.makedirs(temp_dir)
        self._killable_process(shutil.move, src, temp_dir)
        src = os.path.join(temp_dir, os.path.basename(src))
        os.renames(src, dst)

    def _get_move_paths(self, params):
        """Process raw 'src' and 'dst' data."""
        try:
            src_ref = params["src"]
        except KeyError:
            raise ScriptingError("Missing parameter src")
        src = self.game_files.get(src_ref) or self._substitute(src_ref)
        if not src:
            raise ScriptingError("Wrong value for 'src' param", src_ref)
        dst_ref = params["dst"]
        dst = self._substitute(dst_ref)
        if not dst:
            raise ScriptingError("Wrong value for 'dst' param", dst_ref)
        return src.rstrip("/"), dst.rstrip("/")

    def substitute_vars(self, data):
        """Subsitute variable names found in given file."""
        self._check_required_params("file", data, "substitute_vars")
        filename = self._substitute(data["file"])
        logger.debug("Substituting variables for file %s", filename)
        tmp_filename = filename + ".tmp"
        with open(filename, "r") as source_file:
            with open(tmp_filename, "w") as dest_file:
                line = "."
                while line:
                    line = source_file.readline()
                    line = self._substitute(line)
                    dest_file.write(line)
        os.rename(tmp_filename, filename)

    def _get_task_runner_and_name(self, task_name):
        if "." in task_name:
            # Run a task from a different runner
            # than the one for this installer
            runner_name, task_name = task_name.split(".")
        else:
            runner_name = self.runner
        return runner_name, task_name

    def task(self, data):
        """Directive triggering another function specific to a runner.

        The 'name' parameter is mandatory. If 'args' is provided it will be
        passed to the runner task.
        """
        self._check_required_params("name", data, "task")
        if self.parent:
            GLib.idle_add(self.parent.cancel_button.set_sensitive, False)
        runner_name, task_name = self._get_task_runner_and_name(data.pop("name"))

        wine_version = None

        if runner_name.startswith("wine"):
            wine_version = self._get_runner_version()
            if wine_version:
                data["wine_path"] = get_wine_version_exe(wine_version)
            data["prefix"] = data.get("prefix") \
                or self.script.get("game", {}).get("prefix") \
                or "$GAMEDIR"
            data["arch"] = data.get("arch") \
                or self.script.get("game", {}).get("arch") \
                or WINE_DEFAULT_ARCH
            if task_name == "wineexec" and self.script_env:
                data["env"] = self.script_env

        for key in data:
            value = data[key]
            if isinstance(value, dict):
                for inner_key in value:
                    value[inner_key] = self._substitute(value[inner_key])
            elif isinstance(value, list):
                for index, elem in enumerate(value):
                    value[index] = self._substitute(elem)
            else:
                value = self._substitute(data[key])
            data[key] = value

        task = import_task(runner_name, task_name)
        thread = task(**data)
        GLib.idle_add(self.parent.cancel_button.set_sensitive, True)
        if isinstance(thread, MonitoredCommand):
            # Monitor thread and continue when task has executed
            GLib.idle_add(self.parent.attach_logger, thread)
            self.heartbeat = GLib.timeout_add(1000, self._monitor_task, thread)
            return "STOP"
        return None

    def _monitor_task(self, thread):
        if not thread.is_running:
            self._iter_commands()
            return False
        return True

    def write_file(self, params):
        """Write text to a file."""
        self._check_required_params(["file", "content"], params, "write_file")

        # Get file
        dest_file_path = self._get_file(params["file"])

        # Create dir if necessary
        basedir = os.path.dirname(dest_file_path)
        if not os.path.exists(basedir):
            os.makedirs(basedir)

        mode = params.get("mode", "w")
        if not mode.startswith(("a", "w")):
            raise ScriptingError("Wrong value for write_file mode: '%s'" % mode)

        with open(dest_file_path, mode) as dest_file:
            dest_file.write(self._substitute(params["content"]))

    def write_json(self, params):
        """Write data into a json file."""
        self._check_required_params(["file", "data"], params, "write_json")

        # Get file
        filename = self._get_file(params["file"])

        # Create dir if necessary
        basedir = os.path.dirname(filename)
        if not os.path.exists(basedir):
            os.makedirs(basedir)

        merge = params.get("merge", True)

        if not os.path.exists(filename):
            # create an empty file
            with open(filename, "a+"):
                pass

        with open(filename, "r+" if merge else "w") as json_file:
            json_data = {}
            if merge:
                try:
                    json_data = json.load(json_file)
                except ValueError:
                    logger.error("Failed to parse JSON from file %s", filename)

            json_data = selective_merge(json_data, params.get("data", {}))
            json_file.seek(0)
            json_file.write(json.dumps(json_data, indent=2))

    def write_config(self, params):
        """Write a key-value pair into an INI type config file."""
        if params.get("data", None):
            self._check_required_params(["file", "data"], params, "write_config")
        else:
            self._check_required_params(["file", "section", "key", "value"], params, "write_config")
        # Get file
        config_file_path = self._get_file(params["file"])

        # Create dir if necessary
        basedir = os.path.dirname(config_file_path)
        if not os.path.exists(basedir):
            os.makedirs(basedir)

        merge = params.get("merge", True)

        parser = EvilConfigParser(allow_no_value=True, dict_type=MultiOrderedDict, strict=False)
        parser.optionxform = str  # Preserve text case
        if merge:
            parser.read(config_file_path)

        data = {}
        if params.get("data", None):
            data = params["data"]
        else:
            data[params["section"]] = {}
            data[params["section"]][params["key"]] = params["value"]

        for section, keys in data.items():
            if not parser.has_section(section):
                parser.add_section(section)
            for key, value in keys.items():
                value = self._substitute(value)
                parser.set(section, key, value)

        with open(config_file_path, "wb") as config_file:
            parser.write(config_file)

    def _get_file(self, fileid):
        file_path = self.game_files.get(fileid)
        if not file_path:
            file_path = self._substitute(fileid)
        return file_path

    def _killable_process(self, func, *args, **kwargs):
        """Run function `func` in a separate, killable process."""
        process = multiprocessing.Pool(1)
        result_obj = process.apply_async(func, args, kwargs)
        self.abort_current_task = process.terminate
        result = result_obj.get()  # Wait process end & reraise exceptions
        self.abort_current_task = None
        return result