Repository URL to install this package:
| 
          
        
        Version: 
           
    
          0.1.31-1  ▾
        
   | 
# config.py - Reading and writing Git config files
# Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#
"""Reading and writing Git configuration files.
Todo:
 * preserve formatting when updating configuration files
 * treat subsection names as case-insensitive for [branch.foo] style
   subsections
"""
import os
import sys
from contextlib import suppress
from typing import (
    Any,
    BinaryIO,
    Dict,
    Iterable,
    Iterator,
    KeysView,
    List,
    MutableMapping,
    Optional,
    Tuple,
    Union,
    overload,
)
from .file import GitFile
SENTINEL = object()
def lower_key(key):
    if isinstance(key, (bytes, str)):
        return key.lower()
    if isinstance(key, Iterable):
        return type(key)(map(lower_key, key))  # type: ignore
    return key
class CaseInsensitiveOrderedMultiDict(MutableMapping):
    def __init__(self) -> None:
        self._real: List[Any] = []
        self._keyed: Dict[Any, Any] = {}
    @classmethod
    def make(cls, dict_in=None):
        if isinstance(dict_in, cls):
            return dict_in
        out = cls()
        if dict_in is None:
            return out
        if not isinstance(dict_in, MutableMapping):
            raise TypeError
        for key, value in dict_in.items():
            out[key] = value
        return out
    def __len__(self) -> int:
        return len(self._keyed)
    def keys(self) -> KeysView[Tuple[bytes, ...]]:
        return self._keyed.keys()
    def items(self):
        return iter(self._real)
    def __iter__(self):
        return self._keyed.__iter__()
    def values(self):
        return self._keyed.values()
    def __setitem__(self, key, value) -> None:
        self._real.append((key, value))
        self._keyed[lower_key(key)] = value
    def __delitem__(self, key) -> None:
        key = lower_key(key)
        del self._keyed[key]
        for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
            if lower_key(actual) == key:
                del self._real[i]
    def __getitem__(self, item):
        return self._keyed[lower_key(item)]
    def get(self, key, default=SENTINEL):
        try:
            return self[key]
        except KeyError:
            pass
        if default is SENTINEL:
            return type(self)()
        return default
    def get_all(self, key):
        key = lower_key(key)
        for actual, value in self._real:
            if lower_key(actual) == key:
                yield value
    def setdefault(self, key, default=SENTINEL):
        try:
            return self[key]
        except KeyError:
            self[key] = self.get(key, default)
        return self[key]
Name = bytes
NameLike = Union[bytes, str]
Section = Tuple[bytes, ...]
SectionLike = Union[bytes, str, Tuple[Union[bytes, str], ...]]
Value = bytes
ValueLike = Union[bytes, str]
class Config:
    """A Git configuration."""
    def get(self, section: SectionLike, name: NameLike) -> Value:
        """Retrieve the contents of a configuration setting.
        Args:
          section: Tuple with section name and optional subsection name
          name: Variable name
        Returns:
          Contents of the setting
        Raises:
          KeyError: if the value is not set
        """
        raise NotImplementedError(self.get)
    def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
        """Retrieve the contents of a multivar configuration setting.
        Args:
          section: Tuple with section name and optional subsection namee
          name: Variable name
        Returns:
          Contents of the setting as iterable
        Raises:
          KeyError: if the value is not set
        """
        raise NotImplementedError(self.get_multivar)
    @overload
    def get_boolean(self, section: SectionLike, name: NameLike, default: bool) -> bool:
        ...
    @overload
    def get_boolean(self, section: SectionLike, name: NameLike) -> Optional[bool]:
        ...
    def get_boolean(
        self, section: SectionLike, name: NameLike, default: Optional[bool] = None
    ) -> Optional[bool]:
        """Retrieve a configuration setting as boolean.
        Args:
          section: Tuple with section name and optional subsection name
          name: Name of the setting, including section and possible
            subsection.
        Returns:
          Contents of the setting
        """
        try:
            value = self.get(section, name)
        except KeyError:
            return default
        if value.lower() == b"true":
            return True
        elif value.lower() == b"false":
            return False
        raise ValueError("not a valid boolean string: %r" % value)
    def set(
        self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
    ) -> None:
        """Set a configuration value.
        Args:
          section: Tuple with section name and optional subsection namee
          name: Name of the configuration value, including section
            and optional subsection
          value: value of the setting
        """
        raise NotImplementedError(self.set)
    def items(self, section: SectionLike) -> Iterator[Tuple[Name, Value]]:
        """Iterate over the configuration pairs for a specific section.
        Args:
          section: Tuple with section name and optional subsection namee
        Returns:
          Iterator over (name, value) pairs
        """
        raise NotImplementedError(self.items)
    def sections(self) -> Iterator[Section]:
        """Iterate over the sections.
        Returns: Iterator over section tuples
        """
        raise NotImplementedError(self.sections)
    def has_section(self, name: Section) -> bool:
        """Check if a specified section exists.
        Args:
          name: Name of section to check for
        Returns:
          boolean indicating whether the section exists
        """
        return name in self.sections()
class ConfigDict(Config, MutableMapping[Section, MutableMapping[Name, Value]]):
    """Git configuration stored in a dictionary."""
    def __init__(
        self,
        values: Union[
            MutableMapping[Section, MutableMapping[Name, Value]], None
        ] = None,
        encoding: Union[str, None] = None,
    ) -> None:
        """Create a new ConfigDict."""
        if encoding is None:
            encoding = sys.getdefaultencoding()
        self.encoding = encoding
        self._values = CaseInsensitiveOrderedMultiDict.make(values)
    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self._values!r})"
    def __eq__(self, other: object) -> bool:
        return isinstance(other, self.__class__) and other._values == self._values
    def __getitem__(self, key: Section) -> MutableMapping[Name, Value]:
        return self._values.__getitem__(key)
    def __setitem__(self, key: Section, value: MutableMapping[Name, Value]) -> None:
        return self._values.__setitem__(key, value)
    def __delitem__(self, key: Section) -> None:
        return self._values.__delitem__(key)
    def __iter__(self) -> Iterator[Section]:
        return self._values.__iter__()
    def __len__(self) -> int:
        return self._values.__len__()
    @classmethod
    def _parse_setting(cls, name: str):
        parts = name.split(".")
        if len(parts) == 3:
            return (parts[0], parts[1], parts[2])
        else:
            return (parts[0], None, parts[1])
    def _check_section_and_name(
        self, section: SectionLike, name: NameLike
    ) -> Tuple[Section, Name]:
        if not isinstance(section, tuple):
            section = (section,)
        checked_section = tuple(
            [
                subsection.encode(self.encoding)
                if not isinstance(subsection, bytes)
                else subsection
                for subsection in section
            ]
        )
        if not isinstance(name, bytes):
            name = name.encode(self.encoding)
        return checked_section, name
    def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
        section, name = self._check_section_and_name(section, name)
        if len(section) > 1:
            try:
                return self._values[section].get_all(name)
            except KeyError:
                pass
        return self._values[(section[0],)].get_all(name)
    def get(  # type: ignore[override]
        self,
        section: SectionLike,
        name: NameLike,
    ) -> Value:
        section, name = self._check_section_and_name(section, name)
        if len(section) > 1:
            try:
                return self._values[section][name]
            except KeyError:
                pass
        return self._values[(section[0],)][name]
    def set(
        self,
        section: SectionLike,
        name: NameLike,
        value: Union[ValueLike, bool],
    ) -> None:
        section, name = self._check_section_and_name(section, name)
        if isinstance(value, bool):
            value = b"true" if value else b"false"
        if not isinstance(value, bytes):
            value = value.encode(self.encoding)
        self._values.setdefault(section)[name] = value
    def items(  # type: ignore[override]
        self, section: Section
    ) -> Iterator[Tuple[Name, Value]]:
        return self._values.get(section).items()
    def sections(self) -> Iterator[Section]:
        return self._values.keys()
def _format_string(value: bytes) -> bytes:
    if (
        value.startswith((b" ", b"\t"))
        or value.endswith((b" ", b"\t"))
        or b"#" in value
    ):
        return b'"' + _escape_value(value) + b'"'
    else:
        return _escape_value(value)
_ESCAPE_TABLE = {
    ord(b"\\"): ord(b"\\"),
    ord(b'"'): ord(b'"'),
    ord(b"n"): ord(b"\n"),
    ord(b"t"): ord(b"\t"),
    ord(b"b"): ord(b"\b"),
}
_COMMENT_CHARS = [ord(b"#"), ord(b";")]
_WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
def _parse_string(value: bytes) -> bytes:
    value = bytearray(value.strip())
    ret = bytearray()
    whitespace = bytearray()
    in_quotes = False
    i = 0
    while i < len(value):
        c = value[i]
        if c == ord(b"\\"):
            i += 1
            try:
                v = _ESCAPE_TABLE[value[i]]
            except IndexError as exc:
                raise ValueError(
                    "escape character in %r at %d before end of string" % (value, i)
                ) from exc
            except KeyError as exc:
                raise ValueError(
                    "escape character followed by unknown character "
                    "%s at %d in %r" % (value[i], i, value)
                ) from exc
            if whitespace:
                ret.extend(whitespace)
                whitespace = bytearray()
            ret.append(v)
        elif c == ord(b'"'):
            in_quotes = not in_quotes
        elif c in _COMMENT_CHARS and not in_quotes:
            # the rest of the line is a comment
            break
        elif c in _WHITESPACE_CHARS:
            whitespace.append(c)
        else:
            if whitespace:
                ret.extend(whitespace)
                whitespace = bytearray()
            ret.append(c)
        i += 1
    if in_quotes:
        raise ValueError("missing end quote")
    return bytes(ret)
def _escape_value(value: bytes) -> bytes:
    """Escape a value."""
    value = value.replace(b"\\", b"\\\\")
    value = value.replace(b"\r", b"\\r")
    value = value.replace(b"\n", b"\\n")
    value = value.replace(b"\t", b"\\t")
    value = value.replace(b'"', b'\\"')
    return value
def _check_variable_name(name: bytes) -> bool:
    for i in range(len(name)):
        c = name[i : i + 1]
        if not c.isalnum() and c != b"-":
            return False
    return True
def _check_section_name(name: bytes) -> bool:
    for i in range(len(name)):
        c = name[i : i + 1]
        if not c.isalnum() and c not in (b"-", b"."):
            return False
    return True
def _strip_comments(line: bytes) -> bytes:
    comment_bytes = {ord(b"#"), ord(b";")}
    quote = ord(b'"')
    string_open = False
    # Normalize line to bytearray for simple 2/3 compatibility
    for i, character in enumerate(bytearray(line)):
        # Comment characters outside balanced quotes denote comment start
        if character == quote:
            string_open = not string_open
        elif not string_open and character in comment_bytes:
            return line[:i]
    return line
def _parse_section_header_line(line: bytes) -> Tuple[Section, bytes]:
    # Parse section header ("[bla]")
    line = _strip_comments(line).rstrip()
    in_quotes = False
    escaped = False
    for i, c in enumerate(line):
        if escaped:
            escaped = False
            continue
        if c == ord(b'"'):
            in_quotes = not in_quotes
        if c == ord(b"\\"):
            escaped = True
        if c == ord(b"]") and not in_quotes:
            last = i
            break
    else:
        raise ValueError("expected trailing ]")
    pts = line[1:last].split(b" ", 1)
    line = line[last + 1 :]
    section: Section
    if len(pts) == 2:
        if pts[1][:1] != b'"' or pts[1][-1:] != b'"':
            raise ValueError("Invalid subsection %r" % pts[1])
        else:
            pts[1] = pts[1][1:-1]
        if not _check_section_name(pts[0]):
            raise ValueError("invalid section name %r" % pts[0])
        section = (pts[0], pts[1])
    else:
        if not _check_section_name(pts[0]):
            raise ValueError("invalid section name %r" % pts[0])
        pts = pts[0].split(b".", 1)
        if len(pts) == 2:
            section = (pts[0], pts[1])
        else:
            section = (pts[0],)
    return section, line
class ConfigFile(ConfigDict):
    """A Git configuration file, like .git/config or ~/.gitconfig."""
    def __init__(
        self,
        values: Union[
            MutableMapping[Section, MutableMapping[Name, Value]], None
        ] = None,
        encoding: Union[str, None] = None,
    ) -> None:
        super().__init__(values=values, encoding=encoding)
        self.path: Optional[str] = None
    @classmethod
    def from_file(cls, f: BinaryIO) -> "ConfigFile":
        """Read configuration from a file-like object."""
        ret = cls()
        section: Optional[Section] = None
        setting = None
        continuation = None
        for lineno, line in enumerate(f.readlines()):
            if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
                line = line[3:]
            line = line.lstrip()
            if setting is None:
                if len(line) > 0 and line[:1] == b"[":
                    section, line = _parse_section_header_line(line)
                    ret._values.setdefault(section)
                if _strip_comments(line).strip() == b"":
                    continue
                if section is None:
                    raise ValueError("setting %r without section" % line)
                try:
                    setting, value = line.split(b"=", 1)
                except ValueError:
                    setting = line
                    value = b"true"
                setting = setting.strip()
                if not _check_variable_name(setting):
                    raise ValueError("invalid variable name %r" % setting)
                if value.endswith(b"\\\n"):
                    continuation = value[:-2]
                elif value.endswith(b"\\\r\n"):
                    continuation = value[:-3]
                else:
                    continuation = None
                    value = _parse_string(value)
                    ret._values[section][setting] = value
                    setting = None
            else:  # continuation line
                if line.endswith(b"\\\n"):
                    continuation += line[:-2]
                elif line.endswith(b"\\\r\n"):
                    continuation += line[:-3]
                else:
                    continuation += line
                    value = _parse_string(continuation)
                    ret._values[section][setting] = value
                    continuation = None
                    setting = None
        return ret
    @classmethod
    def from_path(cls, path: str) -> "ConfigFile":
        """Read configuration from a file on disk."""
        with GitFile(path, "rb") as f:
            ret = cls.from_file(f)
            ret.path = path
            return ret
    def write_to_path(self, path: Optional[str] = None) -> None:
        """Write configuration to a file on disk."""
        if path is None:
            path = self.path
        with GitFile(path, "wb") as f:
            self.write_to_file(f)
    def write_to_file(self, f: BinaryIO) -> None:
        """Write configuration to a file-like object."""
        for section, values in self._values.items():
            try:
                section_name, subsection_name = section
            except ValueError:
                (section_name,) = section
                subsection_name = None
            if subsection_name is None:
                f.write(b"[" + section_name + b"]\n")
            else:
                f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
            for key, value in values.items():
                value = _format_string(value)
                f.write(b"\t" + key + b" = " + value + b"\n")
def get_xdg_config_home_path(*path_segments):
    xdg_config_home = os.environ.get(
        "XDG_CONFIG_HOME",
        os.path.expanduser("~/.config/"),
    )
    return os.path.join(xdg_config_home, *path_segments)
def _find_git_in_win_path():
    for exe in ("git.exe", "git.cmd"):
        for path in os.environ.get("PATH", "").split(";"):
            if os.path.exists(os.path.join(path, exe)):
                # in windows native shells (powershell/cmd) exe path is
                # .../Git/bin/git.exe or .../Git/cmd/git.exe
                #
                # in git-bash exe path is .../Git/mingw64/bin/git.exe
                git_dir, _bin_dir = os.path.split(path)
                yield git_dir
                parent_dir, basename = os.path.split(git_dir)
                if basename == "mingw32" or basename == "mingw64":
                    yield parent_dir
                break
def _find_git_in_win_reg():
    import platform
    import winreg
    if platform.machine() == "AMD64":
        subkey = (
            "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
            "CurrentVersion\\Uninstall\\Git_is1"
        )
    else:
        subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\" "Uninstall\\Git_is1"
    for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE):  # type: ignore
        with suppress(OSError):
            with winreg.OpenKey(key, subkey) as k:  # type: ignore
                val, typ = winreg.QueryValueEx(k, "InstallLocation")  # type: ignore
                if typ == winreg.REG_SZ:  # type: ignore
                    yield val
# There is no set standard for system config dirs on windows. We try the
# following:
#   - %PROGRAMDATA%/Git/config - (deprecated) Windows config dir per CGit docs
#   - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
#     Used if CGit installation (Git/bin/git.exe) is found in PATH in the
#     system registry
def get_win_system_paths():
    if "PROGRAMDATA" in os.environ:
        yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
    for git_dir in _find_git_in_win_path():
        yield os.path.join(git_dir, "etc", "gitconfig")
    for git_dir in _find_git_in_win_reg():
        yield os.path.join(git_dir, "etc", "gitconfig")
class StackedConfig(Config):
    """Configuration which reads from multiple config files.."""
    def __init__(
        self, backends: List[ConfigFile], writable: Optional[ConfigFile] = None
    ) -> None:
        self.backends = backends
        self.writable = writable
    def __repr__(self) -> str:
        return f"<{self.__class__.__name__} for {self.backends!r}>"
    @classmethod
    def default(cls) -> "StackedConfig":
        return cls(cls.default_backends())
    @classmethod
    def default_backends(cls) -> List[ConfigFile]:
        """Retrieve the default configuration.
        See git-config(1) for details on the files searched.
        """
        paths = []
        paths.append(os.path.expanduser("~/.gitconfig"))
        paths.append(get_xdg_config_home_path("git", "config"))
        if "GIT_CONFIG_NOSYSTEM" not in os.environ:
            paths.append("/etc/gitconfig")
            if sys.platform == "win32":
                paths.extend(get_win_system_paths())
        backends = []
        for path in paths:
            try:
                cf = ConfigFile.from_path(path)
            except FileNotFoundError:
                continue
            backends.append(cf)
        return backends
    def get(self, section: SectionLike, name: NameLike) -> Value:
        if not isinstance(section, tuple):
            section = (section,)
        for backend in self.backends:
            try:
                return backend.get(section, name)
            except KeyError:
                pass
        raise KeyError(name)
    def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
        if not isinstance(section, tuple):
            section = (section,)
        for backend in self.backends:
            try:
                yield from backend.get_multivar(section, name)
            except KeyError:
                pass
    def set(
        self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
    ) -> None:
        if self.writable is None:
            raise NotImplementedError(self.set)
        return self.writable.set(section, name, value)
    def sections(self) -> Iterator[Section]:
        seen = set()
        for backend in self.backends:
            for section in backend.sections():
                if section not in seen:
                    seen.add(section)
                    yield section
def read_submodules(path: str) -> Iterator[Tuple[bytes, bytes, bytes]]:
    """Read a .gitmodules file."""
    cfg = ConfigFile.from_path(path)
    return parse_submodules(cfg)
def parse_submodules(config: ConfigFile) -> Iterator[Tuple[bytes, bytes, bytes]]:
    """Parse a gitmodules GitConfig file, returning submodules.
    Args:
      config: A `ConfigFile`
    Returns:
      list of tuples (submodule path, url, name),
        where name is quoted part of the section's name.
    """
    for section in config.keys():
        section_kind, section_name = section
        if section_kind == b"submodule":
            try:
                sm_path = config.get(section, b"path")
                sm_url = config.get(section, b"url")
                yield (sm_path, sm_url, section_name)
            except KeyError:
                # If either path or url is missing, just ignore this
                # submodule entry and move on to the next one. This is
                # how git itself handles malformed .gitmodule entries.
                pass
def iter_instead_of(config: Config, push: bool = False) -> Iterable[Tuple[str, str]]:
    """Iterate over insteadOf / pushInsteadOf values."""
    for section in config.sections():
        if section[0] != b"url":
            continue
        replacement = section[1]
        try:
            needles = list(config.get_multivar(section, "insteadOf"))
        except KeyError:
            needles = []
        if push:
            try:
                needles += list(config.get_multivar(section, "pushInsteadOf"))
            except KeyError:
                pass
        for needle in needles:
            assert isinstance(needle, bytes)
            yield needle.decode("utf-8"), replacement.decode("utf-8")
def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
    """Apply insteadOf / pushInsteadOf to a URL."""
    longest_needle = ""
    updated_url = orig_url
    for needle, replacement in iter_instead_of(config, push):
        if not orig_url.startswith(needle):
            continue
        if len(longest_needle) < len(needle):
            longest_needle = needle
            updated_url = replacement + orig_url[len(needle) :]
    return updated_url