Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from pants.base.exceptions import TargetDefinitionException, TaskError
from pants.base.workunit import WorkUnitLabel
from pants.task.testrunner_task_mixin import TestRunnerTaskMixin
from pants.util.contextutil import pushd
from pants.util.process_handler import SubprocessProcessHandler

from pants.contrib.node.tasks.node_paths import NodePaths
from pants.contrib.node.tasks.node_task import NodeTask


class NodeTest(TestRunnerTaskMixin, NodeTask):
    """Runs a test script from package.json in a NodeModule, currently via "npm run [script name]".

    Implementations of abstract methods from TestRunnerTaskMixin: _execute, _spawn,
    _test_target_filter, _validate_target
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._currently_executing_test_targets = []

    @classmethod
    def prepare(cls, options, round_manager):
        super().prepare(options, round_manager)
        round_manager.require_data(NodePaths)

    @classmethod
    def supports_passthru_args(cls):
        return True

    def _run_node_distribution_command(self, command, workunit):
        """Overrides NodeTask._run_node_distribution_command.

        This is what is ultimately used to run the Command. It must return the return code of the
        process. The base implementation just calls command.run immediately. We override here to
        invoke TestRunnerTaskMixin.spawn_and_wait, which ultimately invokes _spawn, which finally
        calls command.run.
        """
        return self.spawn_and_wait(self._currently_executing_test_targets, command, workunit)

    def _execute(self, all_targets):
        """Implements abstract TestRunnerTaskMixin._execute."""
        targets = self._get_test_targets()
        if not targets:
            return

        node_paths = self.context.products.get_data(NodePaths)

        for target in targets:
            node_module = target.dependencies[0]
            self.context.log.debug(f"Testing node module (first dependency): {node_module}")
            with pushd(node_paths.node_path(node_module)):
                self._currently_executing_test_targets = [target]
                result, test_command = self.run_script(
                    target.script_name,
                    package_manager=self.get_package_manager(target=node_module),
                    target=target,
                    script_args=self.get_passthru_args(),
                    node_paths=node_paths.all_node_paths,
                    workunit_name=target.address.reference(),
                    workunit_labels=[WorkUnitLabel.TEST],
                )
                if result != 0:
                    raise TaskError(
                        "test script failed:\n"
                        "\t{} failed with exit code {}".format(test_command, result)
                    )
        self._currently_executing_test_targets = []

    def _spawn(self, command, workunit):
        """Implements abstract TestRunnerTaskMixin._spawn."""
        process = command.run(stdout=workunit.output("stdout"), stderr=workunit.output("stderr"))
        return SubprocessProcessHandler(process)

    def _test_target_filter(self):
        """Implements abstract TestRunnerTaskMixin._test_target_filter."""
        return self.is_node_test

    def _validate_target(self, target):
        """Implements abstract TestRunnerTaskMixin._validate_target."""
        if len(target.dependencies) != 1 or not self.is_node_module(target.dependencies[0]):
            message = "NodeTest targets must depend on exactly one NodeModule target."
            raise TargetDefinitionException(target, message)