Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
apache-airflow / utils / log / secrets_masker.py
Size: Mime:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Mask sensitive information from logs"""
import collections
import logging
import re
from typing import TYPE_CHECKING, Iterable, Optional, Set, TypeVar, Union

from airflow.compat.functools import cache, cached_property

if TYPE_CHECKING:
    from airflow.typing_compat import RePatternType

    RedactableItem = TypeVar('RedactableItem')


log = logging.getLogger(__name__)


DEFAULT_SENSITIVE_FIELDS = frozenset(
    {
        'access_token',
        'api_key',
        'apikey',
        'authorization',
        'passphrase',
        'passwd',
        'password',
        'private_key',
        'secret',
        'token',
    }
)
"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""


@cache
def get_sensitive_variables_fields():
    """Get comma-separated sensitive Variable Fields from airflow.cfg."""
    from airflow.configuration import conf

    sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
    sensitive_variable_fields = conf.get('core', 'sensitive_var_conn_names')
    if sensitive_variable_fields:
        sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(',')})
    return sensitive_fields


def should_hide_value_for_key(name):
    """Should the value for this given name (Variable name, or key in conn.extra_dejson) be hidden"""
    from airflow import settings

    if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
        name = name.strip().lower()
        return any(s in name for s in get_sensitive_variables_fields())
    return False


def mask_secret(secret: Union[str, dict, Iterable], name: str = None) -> None:
    """
    Mask a secret from appearing in the task logs.

    If ``name`` is provided, then it will only be masked if the name matches
    one of the configured "sensitive" names.

    If ``secret`` is a dict or a iterable (excluding str) then it will be
    recursively walked and keys with sensitive names will be hidden.
    """
    # Delay import
    from airflow import settings

    # Filtering all log messages is not a free process, so we only do it when
    # running tasks
    if not settings.MASK_SECRETS_IN_LOGS or not secret:
        return

    _secrets_masker().add_mask(secret, name)


def redact(value: "RedactableItem", name: str = None) -> "RedactableItem":
    """Redact any secrets found in ``value``."""
    return _secrets_masker().redact(value, name)


@cache
def _secrets_masker() -> "SecretsMasker":

    for flt in logging.getLogger('airflow.task').filters:
        if isinstance(flt, SecretsMasker):
            return flt
    raise RuntimeError(
        "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make "
        "sure you configure it taking airflow configuration as a base as explained at "
        "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html"
        "#advanced-configuration"
    )


class SecretsMasker(logging.Filter):
    """Redact secrets from logs"""

    replacer: Optional["RePatternType"] = None
    patterns: Set[str]

    ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
    MAX_RECURSION_DEPTH = 5

    def __init__(self):
        super().__init__()
        self.patterns = set()

    @cached_property
    def _record_attrs_to_ignore(self) -> Iterable[str]:
        # Doing log.info(..., extra={'foo': 2}) sets extra properties on
        # record, i.e. record.foo. And we need to filter those too. Fun
        #
        # Create a record, and look at what attributes are on it, and ignore
        # all the default ones!

        record = logging.getLogRecordFactory()(
            # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
            "x",
            logging.INFO,
            __file__,
            1,
            "",
            tuple(),
            exc_info=None,
            func="funcname",
        )
        return frozenset(record.__dict__).difference({'msg', 'args'})

    def _redact_exception_with_context(self, exception):
        exception.args = (self.redact(v) for v in exception.args)
        if exception.__context__:
            self._redact_exception_with_context(exception.__context__)
        if exception.__cause__ and exception.__cause__ is not exception.__context__:
            self._redact_exception_with_context(exception.__cause__)

    def filter(self, record) -> bool:
        if self.ALREADY_FILTERED_FLAG in record.__dict__:
            # Filters are attached to multiple handlers and logs, keep a
            # "private" flag that stops us needing to process it more than once
            return True

        if self.replacer:
            for k, v in record.__dict__.items():
                if k in self._record_attrs_to_ignore:
                    continue
                record.__dict__[k] = self.redact(v)
            if record.exc_info and record.exc_info[1] is not None:
                exc = record.exc_info[1]
                self._redact_exception_with_context(exc)
        record.__dict__[self.ALREADY_FILTERED_FLAG] = True

        return True

    def _redact_all(self, item: "RedactableItem", depth: int) -> "RedactableItem":
        if depth > self.MAX_RECURSION_DEPTH or isinstance(item, str):
            return '***'
        if isinstance(item, dict):
            return {dict_key: self._redact_all(subval, depth + 1) for dict_key, subval in item.items()}
        elif isinstance(item, (tuple, set)):
            # Turn set in to tuple!
            return tuple(self._redact_all(subval, depth + 1) for subval in item)
        elif isinstance(item, list):
            return list(self._redact_all(subval, depth + 1) for subval in item)
        else:
            return item

    def _redact(self, item: "RedactableItem", name: Optional[str], depth: int) -> "RedactableItem":
        # Avoid spending too much effort on redacting on deeply nested
        # structures. This also avoid infinite recursion if a structure has
        # reference to self.
        if depth > self.MAX_RECURSION_DEPTH:
            return item
        try:
            if name and should_hide_value_for_key(name):
                return self._redact_all(item, depth)
            if isinstance(item, dict):
                return {
                    dict_key: self._redact(subval, name=dict_key, depth=(depth + 1))
                    for dict_key, subval in item.items()
                }
            elif isinstance(item, str):
                if self.replacer:
                    # We can't replace specific values, but the key-based redacting
                    # can still happen, so we can't short-circuit, we need to walk
                    # the structure.
                    return self.replacer.sub('***', item)
                return item
            elif isinstance(item, (tuple, set)):
                # Turn set in to tuple!
                return tuple(self._redact(subval, name=None, depth=(depth + 1)) for subval in item)
            elif isinstance(item, list):
                return [self._redact(subval, name=None, depth=(depth + 1)) for subval in item]
            else:
                return item
        # I think this should never happen, but it does not hurt to leave it just in case
        except Exception as e:
            log.warning(
                "Unable to redact %r, please report this via <https://github.com/apache/airflow/issues>. "
                "Error was: %s: %s",
                item,
                type(e).__name__,
                str(e),
            )
            return item

    def redact(self, item: "RedactableItem", name: Optional[str] = None) -> "RedactableItem":
        """Redact an any secrets found in ``item``, if it is a string.

        If ``name`` is given, and it's a "sensitive" name (see
        :func:`should_hide_value_for_key`) then all string values in the item
        is redacted.
        """
        return self._redact(item, name, depth=0)

    def add_mask(self, secret: Union[str, dict, Iterable], name: str = None):
        """Add a new secret to be masked to this filter instance."""
        if isinstance(secret, dict):
            for k, v in secret.items():
                self.add_mask(v, k)
        elif isinstance(secret, str):
            if not secret:
                return
            pattern = re.escape(secret)
            if pattern not in self.patterns and (not name or should_hide_value_for_key(name)):
                self.patterns.add(pattern)
                self.replacer = re.compile('|'.join(self.patterns))
        elif isinstance(secret, collections.abc.Iterable):
            for v in secret:
                self.add_mask(v, name)