Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
apache-airflow / utils / file.py
Size: Mime:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import logging
import os
import re
import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Pattern, Union

from airflow.configuration import conf

if TYPE_CHECKING:
    import pathlib

log = logging.getLogger(__name__)


def TemporaryDirectory(*args, **kwargs):
    """This function is deprecated. Please use `tempfile.TemporaryDirectory`"""
    import warnings
    from tempfile import TemporaryDirectory as TmpDir

    warnings.warn(
        "This function is deprecated. Please use `tempfile.TemporaryDirectory`",
        DeprecationWarning,
        stacklevel=2,
    )

    return TmpDir(*args, **kwargs)


def mkdirs(path, mode):
    """
    Creates the directory specified by path, creating intermediate directories
    as necessary. If directory already exists, this is a no-op.

    :param path: The directory to create
    :type path: str
    :param mode: The mode to give to the directory e.g. 0o755, ignores umask
    :type mode: int
    """
    import warnings

    warnings.warn(
        f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
        DeprecationWarning,
        stacklevel=2,
    )
    Path(path).mkdir(mode=mode, parents=True, exist_ok=True)


ZIP_REGEX = re.compile(fr'((.*\.zip){re.escape(os.sep)})?(.*)')


def correct_maybe_zipped(fileloc):
    """
    If the path contains a folder with a .zip suffix, then
    the folder is treated as a zip archive and path to zip is returned.
    """
    if not fileloc:
        return fileloc
    _, archive, _ = ZIP_REGEX.search(fileloc).groups()
    if archive and zipfile.is_zipfile(archive):
        return archive
    else:
        return fileloc


def open_maybe_zipped(fileloc, mode='r'):
    """
    Opens the given file. If the path contains a folder with a .zip suffix, then
    the folder is treated as a zip archive, opening the file inside the archive.

    :return: a file object, as in `open`, or as in `ZipFile.open`.
    """
    _, archive, filename = ZIP_REGEX.search(fileloc).groups()
    if archive and zipfile.is_zipfile(archive):
        return io.TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
    else:

        return open(fileloc, mode=mode)


def find_path_from_directory(base_dir_path: str, ignore_file_name: str) -> Generator[str, None, None]:
    """
    Search the file and return the path of the file that should not be ignored.
    :param base_dir_path: the base path to be searched for.
    :param ignore_file_name: the file name in which specifies a regular expression pattern is written.

    :return : file path not to be ignored.
    """
    patterns_by_dir: Dict[str, List[Pattern[str]]] = {}

    for root, dirs, files in os.walk(str(base_dir_path), followlinks=True):
        patterns: List[Pattern[str]] = patterns_by_dir.get(root, [])

        ignore_file_path = os.path.join(root, ignore_file_name)
        if os.path.isfile(ignore_file_path):
            with open(ignore_file_path) as file:
                lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in file.read().split("\n")]
                patterns += [re.compile(line) for line in lines_no_comments if line]
                patterns = list(set(patterns))

        dirs[:] = [
            subdir
            for subdir in dirs
            if not any(
                p.search(os.path.join(os.path.relpath(root, str(base_dir_path)), subdir)) for p in patterns
            )
        ]

        patterns_by_dir.update({os.path.join(root, sd): patterns.copy() for sd in dirs})

        for file in files:  # type: ignore
            if file == ignore_file_name:
                continue
            abs_file_path = os.path.join(root, str(file))
            rel_file_path = os.path.join(os.path.relpath(root, str(base_dir_path)), str(file))
            if any(p.search(rel_file_path) for p in patterns):
                continue
            yield str(abs_file_path)


def list_py_file_paths(
    directory: Union[str, "pathlib.Path"],
    safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True),
    include_examples: Optional[bool] = None,
    include_smart_sensor: Optional[bool] = conf.getboolean('smart_sensor', 'use_smart_sensor'),
):
    """
    Traverse a directory and look for Python files.

    :param directory: the directory to traverse
    :type directory: unicode
    :param safe_mode: whether to use a heuristic to determine whether a file
        contains Airflow DAG definitions. If not provided, use the
        core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
        to safe.
    :type safe_mode: bool
    :param include_examples: include example DAGs
    :type include_examples: bool
    :param include_smart_sensor: include smart sensor native control DAGs
    :type include_examples: bool
    :return: a list of paths to Python files in the specified directory
    :rtype: list[unicode]
    """
    if include_examples is None:
        include_examples = conf.getboolean('core', 'LOAD_EXAMPLES')
    file_paths: List[str] = []
    if directory is None:
        file_paths = []
    elif os.path.isfile(directory):
        file_paths = [str(directory)]
    elif os.path.isdir(directory):
        file_paths.extend(find_dag_file_paths(directory, safe_mode))
    if include_examples:
        from airflow import example_dags

        example_dag_folder = example_dags.__path__[0]  # type: ignore
        file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False, False))
    if include_smart_sensor:
        from airflow import smart_sensor_dags

        smart_sensor_dag_folder = smart_sensor_dags.__path__[0]  # type: ignore
        file_paths.extend(list_py_file_paths(smart_sensor_dag_folder, safe_mode, False, False))
    return file_paths


def find_dag_file_paths(directory: Union[str, "pathlib.Path"], safe_mode: bool) -> List[str]:
    """Finds file paths of all DAG files."""
    file_paths = []

    for file_path in find_path_from_directory(str(directory), ".airflowignore"):
        try:
            if not os.path.isfile(file_path):
                continue
            _, file_ext = os.path.splitext(os.path.split(file_path)[-1])
            if file_ext != '.py' and not zipfile.is_zipfile(file_path):
                continue
            if not might_contain_dag(file_path, safe_mode):
                continue

            file_paths.append(file_path)
        except Exception:
            log.exception("Error while examining %s", file_path)

    return file_paths


COMMENT_PATTERN = re.compile(r"\s*#.*")


def might_contain_dag(file_path: str, safe_mode: bool, zip_file: Optional[zipfile.ZipFile] = None):
    """
    Heuristic that guesses whether a Python file contains an Airflow DAG definition.

    :param file_path: Path to the file to be checked.
    :param safe_mode: Is safe mode active?. If no, this function always returns True.
    :param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
    :return: True, if file might contain DAGs.
    """
    if not safe_mode:
        return True
    if zip_file:
        with zip_file.open(file_path) as current_file:
            content = current_file.read()
    else:
        if zipfile.is_zipfile(file_path):
            return True
        with open(file_path, 'rb') as dag_file:
            content = dag_file.read()
    content = content.lower()
    return all(s in content for s in (b'dag', b'airflow'))