Repository URL to install this package:
|
Version:
2.0.5 ▾
|
import abc
import logging
from itertools import chain
from typing import Dict, Iterable, Iterator, Tuple
from pythonjsonlogger.jsonlogger import RESERVED_ATTRS, JsonFormatter
ELASTIC_APM_ATTRS = (
"elasticapm_labels",
"elasticapm_service_name",
"elasticapm_span_id",
"elasticapm_trace_id",
"elasticapm_transaction_id",
"elasticapm_event_dataset",
)
DEFAULT_LOGGING_FORMAT = "asctime=%(asctime)s levelname=%(levelname)s name=%(name)s pathname=%(pathname)s lineno=%(lineno)d message=%(message)s"
DEFAULT_JSON_LOGGING_FORMAT = "%(asctime)s %(levelname)s %(name)s %(pathname)s %(lineno)d %(message)s"
def get_loggers(level, loggers):
logging.addLevelName("DISABLED", logging.CRITICAL + 10)
log_config = {"handlers": ["console"], "level": level}
if level == "DISABLED":
loggers = {"": {"handlers": ["null"], "level": "DEBUG", "propagate": False}}
else:
loggers = {logger.strip(): log_config for logger in loggers}
return loggers
def parse_loggers(loggers: Iterable[str]) -> Iterator[Tuple[str, str]]:
for logger in loggers:
log_parts = logger.split("=")
if len(log_parts) == 1:
yield log_parts[0], "INFO"
elif len(log_parts) == 2:
yield log_parts[0], log_parts[1]
class StreamFormatterWithExtra(logging.Formatter):
def __init__(self, *args, **kwargs):
self.reserved = kwargs.pop("reserved_attrs", RESERVED_ATTRS)
super().__init__(*args, **kwargs)
def get_extra(self, record: logging.LogRecord) -> Iterator[str]:
for key, value in record.__dict__.items():
if key not in self.reserved and not (hasattr(key, "startswith") and key.startswith("_")):
yield f"{key}={value!r}"
def format(self, record: logging.LogRecord) -> str:
base = super().format(record)
return ", ".join(chain([base], self.get_extra(record)))
class BaseExtraConfig(abc.ABC):
@abc.abstractmethod
def configure(self, logging_config: Dict) -> None:
pass
class LogentriesConfig(BaseExtraConfig):
def __init__(self, token: str, formatter: str = "json"):
self.token = token
self.formatter = formatter
def configure(self, logging_config: Dict) -> None:
if not self.token:
return
if "logentries" not in logging_config["root"]["handlers"]:
logging_config["root"]["handlers"].append("logentries")
logentries_config = {
"logentries": {
"class": "logentries.LogentriesHandler",
"token": self.token,
"formatter": self.formatter,
}
}
logging_config["handlers"].update(logentries_config)
def enable_logentries(logging_config: Dict, logentries_token: str) -> None:
logentries_configurer = LogentriesConfig(logentries_token)
logentries_configurer.configure(logging_config)
def get_logging_config(
loggers: Iterable[str],
*,
log_default_format=None,
log_json_format=None,
log_stream_handler=None,
extra_config: Iterable[BaseExtraConfig] = [],
**kwargs,
) -> Dict:
logging_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"()": StreamFormatterWithExtra,
"reserved_attrs": RESERVED_ATTRS + ELASTIC_APM_ATTRS,
"format": log_default_format or DEFAULT_LOGGING_FORMAT,
},
"json": {
"()": JsonFormatter,
"reserved_attrs": RESERVED_ATTRS + ELASTIC_APM_ATTRS,
"format": log_json_format or DEFAULT_JSON_LOGGING_FORMAT,
},
},
"handlers": {
"default": {
"class": "logging.StreamHandler",
"formatter": log_stream_handler or "default",
},
},
"root": {
"handlers": ["default"],
},
"loggers": {name: {"level": level} for name, level in parse_loggers(loggers)},
}
for configurer in extra_config:
configurer.configure(logging_config)
if logentries_token := kwargs.get("logentries_token"):
enable_logentries(logging_config, logentries_token)
return logging_config