Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
shellting / src / shellting / shell_runner.py
Size: Mime:
# -*- coding: utf-8 -*-
import io
import logging
import os
import stat

from plumbum import SshMachine, local, ProcessExecutionError

log = logging.getLogger("freckles")

SUCCESS = 0
NO_CHANGE = 100
SKIPPED = 101


class ShellRunner(object):
    def __init__(self, shellting_context):

        self.shellting_context = shellting_context

    def render_environment(self, run_env_dir, commands, functions, resources=None):

        result = {"run_dir": run_env_dir, "run_dir_name": os.path.basename(run_env_dir)}

        working_dir = os.path.join(run_env_dir, "working_dir")
        os.mkdir(working_dir)
        executables_dir = os.path.join(run_env_dir, "executables")
        os.mkdir(executables_dir)
        result["working_dir"] = working_dir
        result["executables_dir"] = executables_dir

        log.debug("Creating shell script from template...")
        template = self.shellting_context.get_template("shell_runner.sh.j2")

        extra_paths = []

        # if resources is None:
        #     resources = {}

        # all_exodus_binaries = []
        # for res_type, resource in resources.items():
        #     pass

        # if not skip_exodus:
        #     exodus_bundle = os.path.join(run_env_dir, "exodus-binaries", "bundle.sh")
        #     exodus_cmd = local["exodus"]
        #     exodus_args = ["-o", exodus_bundle]
        #     exodus_args.append(all_exodus_binaries)
        #     rc, stdout, stderr = exodus_cmd.run(exodus_args)

        result["command"] = commands

        repl_dict = {
            "extra_paths": extra_paths,
            "functions": functions,
            "commands": commands,
        }
        rendered = template.render(repl_dict)

        run_script = os.path.join(run_env_dir, "run.sh")
        with io.open(run_script, "w", encoding="utf-8") as rs:
            rs.write(rendered)

        # make run.sh executable
        st = os.stat(run_script)
        os.chmod(run_script, st.st_mode | stat.S_IEXEC)
        result["run_script"] = run_script

        return result

    def run(
        self,
        run_properties,
        run_cnf,
        # output_callback,
        result_callback,
        parent_task,
        # callback_adapter,
        delete_env=False,
    ):

        # run_dir = run_properties["run_dir"]

        hostname = run_cnf.get("host")
        connection_type = run_cnf.get("connection_type", None)
        if connection_type is None:
            if hostname in ["localhost", "127.0.0.1"]:
                connection_type = "local"
            else:
                connection_type = "ssh"

        if connection_type == "ssh":

            remote = True
            ssh_key = run_cnf.get("ssh_key")
            user = run_cnf.get("user")
            host_ip = run_cnf.get("host_ip")
            ssh_port = run_cnf.get("ssh_port")

            # otherwise we run into problems with Vagrant
            if host_ip:
                h = host_ip
            else:
                h = hostname

            machine = SshMachine(h, port=ssh_port, user=user, keyfile=ssh_key)
        else:
            remote = False
            machine = local

        no_run = run_cnf.get("no_run")

        if no_run:
            return run_properties

        if remote:
            raise Exception("Not implemented yet")
            # copy execution environment
            # td = TaskDetail(
            #     "uploading execution environment", task_type="upload", task_parent=td
            # )
            # upload_task = td.add_subtask(
            #     "uploading execution environment", category="upload"
            # )
            # machine.upload(run_dir, "/tmp")
            # upload_task.finish(success=True, changed=True, skipped=False)
            # run_script = os.path.join("/tmp", run_properties["run.sh"])
        else:
            run_script = run_properties["run_script"]

        machine.env["ECHO_TASK_START"] = "true"
        machine.env["ECHO_TASK_FINISHED"] = "true"

        cmd = machine["bash"]

        # rc, stdout, stderr = cmd.run([run_script], retcode=None)
        current_task = parent_task
        current_task_stdout = None
        current_task_stderr = None

        pending_finshed = None

        try:

            popen = cmd.popen(run_script)

            current_task_id = -1

            log.debug("Reading command output...")
            for line in popen.iter_lines():
                log.debug(line)

                stderr = line[1]
                stdout = line[0]
                stderr_processed = False
                if stdout:
                    if stdout.startswith("STARTING_TASK["):

                        if pending_finshed:

                            stdout_msg = "\n".join(current_task_stdout)
                            stderr_msg = "\n".join(current_task_stderr)

                            current_task = current_task.finish(
                                msg=stdout_msg, error_msg=stderr_msg, **pending_finshed
                            )
                            pending_finshed = None

                        current_task_stdout = []
                        current_task_stderr = []
                        index = stdout.index("]")
                        task_id = int(stdout[14:index])
                        current_msg = stdout[index + 2 :].strip()  # noqa
                        # print(stdout)
                        if task_id > current_task_id:
                            # print("starting: {}".format(msg))
                            # td = TaskDetail(
                            #     task_name=current_msg,
                            #     task_type="script-command",
                            #     task_parent=current_task,
                            #     task_title=current_msg,
                            #     freckles_task_id=task_id,
                            # )
                            current_task = current_task.add_subtask(
                                task_name=current_msg,
                                category="script-command",
                                reference=task_id,
                            )
                            # output_callback.task_started(td)
                            current_task_id = task_id
                        if stderr:
                            current_task_stderr.append(stderr)
                            stderr_processed = True

                    elif stdout.startswith("FINISHED_TASK["):
                        index = stdout.index("]")
                        task_id = int(stdout[14:index])
                        rc = int(stdout[index + 2 :])  # noqa
                        if rc == 0:
                            success = True
                            skipped = False
                            changed = True
                        elif rc == 100:
                            success = True
                            skipped = False
                            changed = False
                        elif rc == 101:
                            success = True
                            skipped = True
                            changed = False
                        else:
                            success = False
                            skipped = None
                            changed = None

                        if stderr:
                            current_task_stderr.append(stderr)
                            stderr_processed = True

                        pending_finshed = dict(
                            success=success, changed=changed, skipped=skipped
                        )

                    else:
                        if stdout:
                            if stdout.lower().startswith("error:"):
                                current_task_stderr.append(stdout[6:])
                            else:
                                current_task_stdout.append(stdout)

                if stderr and not stderr_processed:
                    current_task_stderr.append(stderr)

            rc = popen._proc.returncode

            if pending_finshed:

                stdout_msg = "\n".join(current_task_stdout)
                stderr_msg = "\n".join(current_task_stderr)

                current_task = current_task.finish(
                    msg=stdout_msg, error_msg=stderr_msg, **pending_finshed
                )

            run_properties["return_code"] = rc
            run_properties["signal_status"] = -1

        except (ProcessExecutionError) as e:
            # for l in current_task_stdout:
            #     current_task.add_result_msg(l)
            # for l in current_task_stderr:
            #     current_task.add_result_error(l)
            current_task.finish(success=False, msg=stdout, error_msg=str(e))

        if remote:
            raise Exception("not implemented yet")
            # if delete_env:
            #     td = TaskDetail(
            #         "deleting execution environment",
            #         task_type="delete",
            #         task_parent=current_task,
            #     )
            #     task_
            #     output_callback.task_started(td)
            #     delete = machine["rm"]
            #     rc, stdout, stderr = delete.run(
            #         ["-r", os.path.join("/tmp", run_properties["env_dir_name"])]
            #     )
            #     success = rc == 0
            #     output_callback.register_task_finished(td, success=success)
            # machine.close()

        return run_properties