Repository URL to install this package:
|
Version:
1.26.0.dev0+git7a2db260 ▾
|
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from pants.base.exceptions import TargetDefinitionException, TaskError
from pants.base.workunit import WorkUnitLabel
from pants.task.testrunner_task_mixin import TestRunnerTaskMixin
from pants.util.contextutil import pushd
from pants.util.process_handler import SubprocessProcessHandler
from pants.contrib.node.tasks.node_paths import NodePaths
from pants.contrib.node.tasks.node_task import NodeTask
class NodeTest(TestRunnerTaskMixin, NodeTask):
"""Runs a test script from package.json in a NodeModule, currently via "npm run [script name]".
Implementations of abstract methods from TestRunnerTaskMixin: _execute, _spawn,
_test_target_filter, _validate_target
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._currently_executing_test_targets = []
@classmethod
def prepare(cls, options, round_manager):
super().prepare(options, round_manager)
round_manager.require_data(NodePaths)
@classmethod
def supports_passthru_args(cls):
return True
def _run_node_distribution_command(self, command, workunit):
"""Overrides NodeTask._run_node_distribution_command.
This is what is ultimately used to run the Command. It must return the return code of the
process. The base implementation just calls command.run immediately. We override here to
invoke TestRunnerTaskMixin.spawn_and_wait, which ultimately invokes _spawn, which finally
calls command.run.
"""
return self.spawn_and_wait(self._currently_executing_test_targets, command, workunit)
def _execute(self, all_targets):
"""Implements abstract TestRunnerTaskMixin._execute."""
targets = self._get_test_targets()
if not targets:
return
node_paths = self.context.products.get_data(NodePaths)
for target in targets:
node_module = target.dependencies[0]
self.context.log.debug(f"Testing node module (first dependency): {node_module}")
with pushd(node_paths.node_path(node_module)):
self._currently_executing_test_targets = [target]
result, test_command = self.run_script(
target.script_name,
package_manager=self.get_package_manager(target=node_module),
target=target,
script_args=self.get_passthru_args(),
node_paths=node_paths.all_node_paths,
workunit_name=target.address.reference(),
workunit_labels=[WorkUnitLabel.TEST],
)
if result != 0:
raise TaskError(
"test script failed:\n"
"\t{} failed with exit code {}".format(test_command, result)
)
self._currently_executing_test_targets = []
def _spawn(self, command, workunit):
"""Implements abstract TestRunnerTaskMixin._spawn."""
process = command.run(stdout=workunit.output("stdout"), stderr=workunit.output("stderr"))
return SubprocessProcessHandler(process)
def _test_target_filter(self):
"""Implements abstract TestRunnerTaskMixin._test_target_filter."""
return self.is_node_test
def _validate_target(self, target):
"""Implements abstract TestRunnerTaskMixin._validate_target."""
if len(target.dependencies) != 1 or not self.is_node_module(target.dependencies[0]):
message = "NodeTest targets must depend on exactly one NodeModule target."
raise TargetDefinitionException(target, message)