Repository URL to install this package:
|
Version:
5.3.2 ▾
|
pytest
/
reports.py
|
|---|
from io import StringIO
from pprint import pprint
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import py
from _pytest._code.code import ExceptionChainRepr
from _pytest._code.code import ExceptionInfo
from _pytest._code.code import ReprEntry
from _pytest._code.code import ReprEntryNative
from _pytest._code.code import ReprExceptionInfo
from _pytest._code.code import ReprFileLocation
from _pytest._code.code import ReprFuncArgs
from _pytest._code.code import ReprLocals
from _pytest._code.code import ReprTraceback
from _pytest._code.code import TerminalRepr
from _pytest.nodes import Node
from _pytest.outcomes import skip
from _pytest.pathlib import Path
def getslaveinfoline(node):
try:
return node._slaveinfocache
except AttributeError:
d = node.slaveinfo
ver = "%s.%s.%s" % d["version_info"][:3]
node._slaveinfocache = s = "[{}] {} -- Python {} {}".format(
d["id"], d["sysplatform"], ver, d["executable"]
)
return s
class BaseReport:
when = None # type: Optional[str]
location = None # type: Optional[Tuple[str, Optional[int], str]]
longrepr = None
sections = [] # type: List[Tuple[str, str]]
nodeid = None # type: str
def __init__(self, **kw):
self.__dict__.update(kw)
def toterminal(self, out) -> None:
if hasattr(self, "node"):
out.line(getslaveinfoline(self.node)) # type: ignore
longrepr = self.longrepr
if longrepr is None:
return
if hasattr(longrepr, "toterminal"):
longrepr.toterminal(out)
else:
try:
out.line(longrepr)
except UnicodeEncodeError:
out.line("<unprintable longrepr>")
def get_sections(self, prefix):
for name, content in self.sections:
if name.startswith(prefix):
yield prefix, content
@property
def longreprtext(self):
"""
Read-only property that returns the full string representation
of ``longrepr``.
.. versionadded:: 3.0
"""
tw = py.io.TerminalWriter(stringio=True)
tw.hasmarkup = False
self.toterminal(tw)
exc = tw.stringio.getvalue()
return exc.strip()
@property
def caplog(self):
"""Return captured log lines, if log capturing is enabled
.. versionadded:: 3.5
"""
return "\n".join(
content for (prefix, content) in self.get_sections("Captured log")
)
@property
def capstdout(self):
"""Return captured text from stdout, if capturing is enabled
.. versionadded:: 3.0
"""
return "".join(
content for (prefix, content) in self.get_sections("Captured stdout")
)
@property
def capstderr(self):
"""Return captured text from stderr, if capturing is enabled
.. versionadded:: 3.0
"""
return "".join(
content for (prefix, content) in self.get_sections("Captured stderr")
)
passed = property(lambda x: x.outcome == "passed")
failed = property(lambda x: x.outcome == "failed")
skipped = property(lambda x: x.outcome == "skipped")
@property
def fspath(self):
return self.nodeid.split("::")[0]
@property
def count_towards_summary(self):
"""
**Experimental**
Returns True if this report should be counted towards the totals shown at the end of the
test session: "1 passed, 1 failure, etc".
.. note::
This function is considered **experimental**, so beware that it is subject to changes
even in patch releases.
"""
return True
@property
def head_line(self):
"""
**Experimental**
Returns the head line shown with longrepr output for this report, more commonly during
traceback representation during failures::
________ Test.foo ________
In the example above, the head_line is "Test.foo".
.. note::
This function is considered **experimental**, so beware that it is subject to changes
even in patch releases.
"""
if self.location is not None:
fspath, lineno, domain = self.location
return domain
def _get_verbose_word(self, config):
_category, _short, verbose = config.hook.pytest_report_teststatus(
report=self, config=config
)
return verbose
def _to_json(self):
"""
This was originally the serialize_report() function from xdist (ca03269).
Returns the contents of this report as a dict of builtin entries, suitable for
serialization.
Experimental method.
"""
return _report_to_json(self)
@classmethod
def _from_json(cls, reportdict):
"""
This was originally the serialize_report() function from xdist (ca03269).
Factory method that returns either a TestReport or CollectReport, depending on the calling
class. It's the callers responsibility to know which class to pass here.
Experimental method.
"""
kwargs = _report_kwargs_from_json(reportdict)
return cls(**kwargs)
def _report_unserialization_failure(type_name, report_class, reportdict):
url = "https://github.com/pytest-dev/pytest/issues"
stream = StringIO()
pprint("-" * 100, stream=stream)
pprint("INTERNALERROR: Unknown entry type returned: %s" % type_name, stream=stream)
pprint("report_name: %s" % report_class, stream=stream)
pprint(reportdict, stream=stream)
pprint("Please report this bug at %s" % url, stream=stream)
pprint("-" * 100, stream=stream)
raise RuntimeError(stream.getvalue())
class TestReport(BaseReport):
""" Basic test report object (also used for setup and teardown calls if
they fail).
"""
__test__ = False
def __init__(
self,
nodeid,
location: Tuple[str, Optional[int], str],
keywords,
outcome,
longrepr,
when,
sections=(),
duration=0,
user_properties=None,
**extra
) -> None:
#: normalized collection node id
self.nodeid = nodeid
#: a (filesystempath, lineno, domaininfo) tuple indicating the
#: actual location of a test item - it might be different from the
#: collected one e.g. if a method is inherited from a different module.
self.location = location # type: Tuple[str, Optional[int], str]
#: a name -> value dictionary containing all keywords and
#: markers associated with a test invocation.
self.keywords = keywords
#: test outcome, always one of "passed", "failed", "skipped".
self.outcome = outcome
#: None or a failure representation.
self.longrepr = longrepr
#: one of 'setup', 'call', 'teardown' to indicate runtest phase.
self.when = when
#: user properties is a list of tuples (name, value) that holds user
#: defined properties of the test
self.user_properties = list(user_properties or [])
#: list of pairs ``(str, str)`` of extra information which needs to
#: marshallable. Used by pytest to add captured text
#: from ``stdout`` and ``stderr``, but may be used by other plugins
#: to add arbitrary information to reports.
self.sections = list(sections)
#: time it took to run just the test
self.duration = duration
self.__dict__.update(extra)
def __repr__(self):
return "<{} {!r} when={!r} outcome={!r}>".format(
self.__class__.__name__, self.nodeid, self.when, self.outcome
)
@classmethod
def from_item_and_call(cls, item, call):
"""
Factory method to create and fill a TestReport with standard item and call info.
"""
when = call.when
duration = call.stop - call.start
keywords = {x: 1 for x in item.keywords}
excinfo = call.excinfo
sections = []
if not call.excinfo:
outcome = "passed"
longrepr = None
else:
if not isinstance(excinfo, ExceptionInfo):
outcome = "failed"
longrepr = excinfo
# Type ignored -- see comment where skip.Exception is defined.
elif excinfo.errisinstance(skip.Exception): # type: ignore
outcome = "skipped"
r = excinfo._getreprcrash()
longrepr = (str(r.path), r.lineno, r.message)
else:
outcome = "failed"
if call.when == "call":
longrepr = item.repr_failure(excinfo)
else: # exception in setup or teardown
longrepr = item._repr_failure_py(
excinfo, style=item.config.getoption("tbstyle", "auto")
)
for rwhen, key, content in item._report_sections:
sections.append(("Captured {} {}".format(key, rwhen), content))
return cls(
item.nodeid,
item.location,
keywords,
outcome,
longrepr,
when,
sections,
duration,
user_properties=item.user_properties,
)
class CollectReport(BaseReport):
when = "collect"
def __init__(
self, nodeid: str, outcome, longrepr, result: List[Node], sections=(), **extra
) -> None:
self.nodeid = nodeid
self.outcome = outcome
self.longrepr = longrepr
self.result = result or []
self.sections = list(sections)
self.__dict__.update(extra)
@property
def location(self):
return (self.fspath, None, self.fspath)
def __repr__(self):
return "<CollectReport {!r} lenresult={} outcome={!r}>".format(
self.nodeid, len(self.result), self.outcome
)
class CollectErrorRepr(TerminalRepr):
def __init__(self, msg):
self.longrepr = msg
def toterminal(self, out) -> None:
out.line(self.longrepr, red=True)
def pytest_report_to_serializable(report):
if isinstance(report, (TestReport, CollectReport)):
data = report._to_json()
data["$report_type"] = report.__class__.__name__
return data
def pytest_report_from_serializable(data):
if "$report_type" in data:
if data["$report_type"] == "TestReport":
return TestReport._from_json(data)
elif data["$report_type"] == "CollectReport":
return CollectReport._from_json(data)
assert False, "Unknown report_type unserialize data: {}".format(
data["$report_type"]
)
def _report_to_json(report):
"""
This was originally the serialize_report() function from xdist (ca03269).
Returns the contents of this report as a dict of builtin entries, suitable for
serialization.
"""
def serialize_repr_entry(entry):
entry_data = {"type": type(entry).__name__, "data": entry.__dict__.copy()}
for key, value in entry_data["data"].items():
if hasattr(value, "__dict__"):
entry_data["data"][key] = value.__dict__.copy()
return entry_data
def serialize_repr_traceback(reprtraceback):
result = reprtraceback.__dict__.copy()
result["reprentries"] = [
serialize_repr_entry(x) for x in reprtraceback.reprentries
]
return result
def serialize_repr_crash(reprcrash):
return reprcrash.__dict__.copy()
def serialize_longrepr(rep):
result = {
"reprcrash": serialize_repr_crash(rep.longrepr.reprcrash),
"reprtraceback": serialize_repr_traceback(rep.longrepr.reprtraceback),
"sections": rep.longrepr.sections,
}
if isinstance(rep.longrepr, ExceptionChainRepr):
result["chain"] = []
for repr_traceback, repr_crash, description in rep.longrepr.chain:
result["chain"].append(
(
serialize_repr_traceback(repr_traceback),
serialize_repr_crash(repr_crash),
description,
)
)
else:
result["chain"] = None
return result
d = report.__dict__.copy()
if hasattr(report.longrepr, "toterminal"):
if hasattr(report.longrepr, "reprtraceback") and hasattr(
report.longrepr, "reprcrash"
):
d["longrepr"] = serialize_longrepr(report)
else:
d["longrepr"] = str(report.longrepr)
else:
d["longrepr"] = report.longrepr
for name in d:
if isinstance(d[name], (py.path.local, Path)):
d[name] = str(d[name])
elif name == "result":
d[name] = None # for now
return d
def _report_kwargs_from_json(reportdict):
"""
This was originally the serialize_report() function from xdist (ca03269).
Returns **kwargs that can be used to construct a TestReport or CollectReport instance.
"""
def deserialize_repr_entry(entry_data):
data = entry_data["data"]
entry_type = entry_data["type"]
if entry_type == "ReprEntry":
reprfuncargs = None
reprfileloc = None
reprlocals = None
if data["reprfuncargs"]:
reprfuncargs = ReprFuncArgs(**data["reprfuncargs"])
if data["reprfileloc"]:
reprfileloc = ReprFileLocation(**data["reprfileloc"])
if data["reprlocals"]:
reprlocals = ReprLocals(data["reprlocals"]["lines"])
reprentry = ReprEntry(
lines=data["lines"],
reprfuncargs=reprfuncargs,
reprlocals=reprlocals,
filelocrepr=reprfileloc,
style=data["style"],
) # type: Union[ReprEntry, ReprEntryNative]
elif entry_type == "ReprEntryNative":
reprentry = ReprEntryNative(data["lines"])
else:
_report_unserialization_failure(entry_type, TestReport, reportdict)
return reprentry
def deserialize_repr_traceback(repr_traceback_dict):
repr_traceback_dict["reprentries"] = [
deserialize_repr_entry(x) for x in repr_traceback_dict["reprentries"]
]
return ReprTraceback(**repr_traceback_dict)
def deserialize_repr_crash(repr_crash_dict):
return ReprFileLocation(**repr_crash_dict)
if (
reportdict["longrepr"]
and "reprcrash" in reportdict["longrepr"]
and "reprtraceback" in reportdict["longrepr"]
):
reprtraceback = deserialize_repr_traceback(
reportdict["longrepr"]["reprtraceback"]
)
reprcrash = deserialize_repr_crash(reportdict["longrepr"]["reprcrash"])
if reportdict["longrepr"]["chain"]:
chain = []
for repr_traceback_data, repr_crash_data, description in reportdict[
"longrepr"
]["chain"]:
chain.append(
(
deserialize_repr_traceback(repr_traceback_data),
deserialize_repr_crash(repr_crash_data),
description,
)
)
exception_info = ExceptionChainRepr(
chain
) # type: Union[ExceptionChainRepr,ReprExceptionInfo]
else:
exception_info = ReprExceptionInfo(reprtraceback, reprcrash)
for section in reportdict["longrepr"]["sections"]:
exception_info.addsection(*section)
reportdict["longrepr"] = exception_info
return reportdict