Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# index.py -- File parser/writer for the git index file
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Parser for the git index file format."""

import os
import stat
import struct
import sys
from dataclasses import dataclass
from enum import Enum
from typing import (
    Any,
    BinaryIO,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    Union,
)

from .file import GitFile
from .object_store import iter_tree_contents
from .objects import (
    S_IFGITLINK,
    S_ISGITLINK,
    Blob,
    ObjectID,
    Tree,
    hex_to_sha,
    sha_to_hex,
)
from .pack import ObjectContainer, SHA1Reader, SHA1Writer

# 2-bit stage (during merge)
FLAG_STAGEMASK = 0x3000
FLAG_STAGESHIFT = 12
FLAG_NAMEMASK = 0x0FFF

# assume-valid
FLAG_VALID = 0x8000

# extended flag (must be zero in version 2)
FLAG_EXTENDED = 0x4000

# used by sparse checkout
EXTENDED_FLAG_SKIP_WORKTREE = 0x4000

# used by "git add -N"
EXTENDED_FLAG_INTEND_TO_ADD = 0x2000

DEFAULT_VERSION = 2


class Stage(Enum):
    NORMAL = 0
    MERGE_CONFLICT_ANCESTOR = 1
    MERGE_CONFLICT_THIS = 2
    MERGE_CONFLICT_OTHER = 3


@dataclass
class SerializedIndexEntry:
    name: bytes
    ctime: Union[int, float, Tuple[int, int]]
    mtime: Union[int, float, Tuple[int, int]]
    dev: int
    ino: int
    mode: int
    uid: int
    gid: int
    size: int
    sha: bytes
    flags: int
    extended_flags: int

    def stage(self) -> Stage:
        return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)


@dataclass
class IndexEntry:
    ctime: Union[int, float, Tuple[int, int]]
    mtime: Union[int, float, Tuple[int, int]]
    dev: int
    ino: int
    mode: int
    uid: int
    gid: int
    size: int
    sha: bytes

    @classmethod
    def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
        return cls(
            ctime=serialized.ctime,
            mtime=serialized.mtime,
            dev=serialized.dev,
            ino=serialized.ino,
            mode=serialized.mode,
            uid=serialized.uid,
            gid=serialized.gid,
            size=serialized.size,
            sha=serialized.sha,
        )

    def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
        return SerializedIndexEntry(
            name=name,
            ctime=self.ctime,
            mtime=self.mtime,
            dev=self.dev,
            ino=self.ino,
            mode=self.mode,
            uid=self.uid,
            gid=self.gid,
            size=self.size,
            sha=self.sha,
            flags=stage.value << FLAG_STAGESHIFT,
            extended_flags=0,
        )


class ConflictedIndexEntry:
    """Index entry that represents a conflict."""

    ancestor: Optional[IndexEntry]
    this: Optional[IndexEntry]
    other: Optional[IndexEntry]

    def __init__(
        self,
        ancestor: Optional[IndexEntry] = None,
        this: Optional[IndexEntry] = None,
        other: Optional[IndexEntry] = None,
    ) -> None:
        self.ancestor = ancestor
        self.this = this
        self.other = other


class UnmergedEntries(Exception):
    """Unmerged entries exist in the index."""


def pathsplit(path: bytes) -> Tuple[bytes, bytes]:
    """Split a /-delimited path into a directory part and a basename.

    Args:
      path: The path to split.

    Returns:
      Tuple with directory name and basename
    """
    try:
        (dirname, basename) = path.rsplit(b"/", 1)
    except ValueError:
        return (b"", path)
    else:
        return (dirname, basename)


def pathjoin(*args):
    """Join a /-delimited path."""
    return b"/".join([p for p in args if p])


def read_cache_time(f):
    """Read a cache time.

    Args:
      f: File-like object to read from
    Returns:
      Tuple with seconds and nanoseconds
    """
    return struct.unpack(">LL", f.read(8))


def write_cache_time(f, t):
    """Write a cache time.

    Args:
      f: File-like object to write to
      t: Time to write (as int, float or tuple with secs and nsecs)
    """
    if isinstance(t, int):
        t = (t, 0)
    elif isinstance(t, float):
        (secs, nsecs) = divmod(t, 1.0)
        t = (int(secs), int(nsecs * 1000000000))
    elif not isinstance(t, tuple):
        raise TypeError(t)
    f.write(struct.pack(">LL", *t))


def read_cache_entry(f, version: int) -> SerializedIndexEntry:
    """Read an entry from a cache file.

    Args:
      f: File-like object to read from
    """
    beginoffset = f.tell()
    ctime = read_cache_time(f)
    mtime = read_cache_time(f)
    (
        dev,
        ino,
        mode,
        uid,
        gid,
        size,
        sha,
        flags,
    ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
    if flags & FLAG_EXTENDED:
        if version < 3:
            raise AssertionError("extended flag set in index with version < 3")
        (extended_flags,) = struct.unpack(">H", f.read(2))
    else:
        extended_flags = 0
    name = f.read(flags & FLAG_NAMEMASK)
    # Padding:
    if version < 4:
        real_size = (f.tell() - beginoffset + 8) & ~7
        f.read((beginoffset + real_size) - f.tell())
    return SerializedIndexEntry(
        name,
        ctime,
        mtime,
        dev,
        ino,
        mode,
        uid,
        gid,
        size,
        sha_to_hex(sha),
        flags & ~FLAG_NAMEMASK,
        extended_flags,
    )


def write_cache_entry(f, entry: SerializedIndexEntry, version: int) -> None:
    """Write an index entry to a file.

    Args:
      f: File object
      entry: IndexEntry to write, tuple with:
    """
    beginoffset = f.tell()
    write_cache_time(f, entry.ctime)
    write_cache_time(f, entry.mtime)
    flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
    if entry.extended_flags:
        flags |= FLAG_EXTENDED
    if flags & FLAG_EXTENDED and version is not None and version < 3:
        raise AssertionError("unable to use extended flags in version < 3")
    f.write(
        struct.pack(
            b">LLLLLL20sH",
            entry.dev & 0xFFFFFFFF,
            entry.ino & 0xFFFFFFFF,
            entry.mode,
            entry.uid,
            entry.gid,
            entry.size,
            hex_to_sha(entry.sha),
            flags,
        )
    )
    if flags & FLAG_EXTENDED:
        f.write(struct.pack(b">H", entry.extended_flags))
    f.write(entry.name)
    if version < 4:
        real_size = (f.tell() - beginoffset + 8) & ~7
        f.write(b"\0" * ((beginoffset + real_size) - f.tell()))


class UnsupportedIndexFormat(Exception):
    """An unsupported index format was encountered."""

    def __init__(self, version) -> None:
        self.index_format_version = version


def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
    """Read an index file, yielding the individual entries."""
    header = f.read(4)
    if header != b"DIRC":
        raise AssertionError("Invalid index file header: %r" % header)
    (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
    if version not in (1, 2, 3):
        raise UnsupportedIndexFormat(version)
    for i in range(num_entries):
        yield read_cache_entry(f, version)


def read_index_dict(f) -> Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
    """Read an index file and return it as a dictionary.
       Dict Key is tuple of path and stage number, as
            path alone is not unique
    Args:
      f: File object to read fromls.
    """
    ret: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
    for entry in read_index(f):
        stage = entry.stage()
        if stage == Stage.NORMAL:
            ret[entry.name] = IndexEntry.from_serialized(entry)
        else:
            existing = ret.setdefault(entry.name, ConflictedIndexEntry())
            if isinstance(existing, IndexEntry):
                raise AssertionError("Non-conflicted entry for %r exists" % entry.name)
            if stage == Stage.MERGE_CONFLICT_ANCESTOR:
                existing.ancestor = IndexEntry.from_serialized(entry)
            elif stage == Stage.MERGE_CONFLICT_THIS:
                existing.this = IndexEntry.from_serialized(entry)
            elif stage == Stage.MERGE_CONFLICT_OTHER:
                existing.other = IndexEntry.from_serialized(entry)
    return ret


def write_index(
    f: BinaryIO, entries: List[SerializedIndexEntry], version: Optional[int] = None
):
    """Write an index file.

    Args:
      f: File-like object to write to
      version: Version number to write
      entries: Iterable over the entries to write
    """
    if version is None:
        version = DEFAULT_VERSION
    f.write(b"DIRC")
    f.write(struct.pack(b">LL", version, len(entries)))
    for entry in entries:
        write_cache_entry(f, entry, version)


def write_index_dict(
    f: BinaryIO,
    entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
    version: Optional[int] = None,
) -> None:
    """Write an index file based on the contents of a dictionary.
    being careful to sort by path and then by stage.
    """
    entries_list = []
    for key in sorted(entries):
        value = entries[key]
        if isinstance(value, ConflictedIndexEntry):
            if value.ancestor is not None:
                entries_list.append(
                    value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
                )
            if value.this is not None:
                entries_list.append(
                    value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
                )
            if value.other is not None:
                entries_list.append(
                    value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
                )
        else:
            entries_list.append(value.serialize(key, Stage.NORMAL))
    write_index(f, entries_list, version=version)


def cleanup_mode(mode: int) -> int:
    """Cleanup a mode value.

    This will return a mode that can be stored in a tree object.

    Args:
      mode: Mode to clean up.

    Returns:
      mode
    """
    if stat.S_ISLNK(mode):
        return stat.S_IFLNK
    elif stat.S_ISDIR(mode):
        return stat.S_IFDIR
    elif S_ISGITLINK(mode):
        return S_IFGITLINK
    ret = stat.S_IFREG | 0o644
    if mode & 0o100:
        ret |= 0o111
    return ret


class Index:
    """A Git Index file."""

    _byname: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]

    def __init__(self, filename: Union[bytes, str], read=True) -> None:
        """Create an index object associated with the given filename.

        Args:
          filename: Path to the index file
          read: Whether to initialize the index from the given file, should it exist.
        """
        self._filename = filename
        # TODO(jelmer): Store the version returned by read_index
        self._version = None
        self.clear()
        if read:
            self.read()

    @property
    def path(self):
        return self._filename

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self._filename!r})"

    def write(self) -> None:
        """Write current contents of index to disk."""
        f = GitFile(self._filename, "wb")
        try:
            f = SHA1Writer(f)
            write_index_dict(f, self._byname, version=self._version)
        finally:
            f.close()

    def read(self):
        """Read current contents of index from disk."""
        if not os.path.exists(self._filename):
            return
        f = GitFile(self._filename, "rb")
        try:
            f = SHA1Reader(f)
            self.update(read_index_dict(f))
            # FIXME: Additional data?
            f.read(os.path.getsize(self._filename) - f.tell() - 20)
            f.check_sha()
        finally:
            f.close()

    def __len__(self) -> int:
        """Number of entries in this index file."""
        return len(self._byname)

    def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
        """Retrieve entry by relative path and stage.

        Returns: Either a IndexEntry or a ConflictedIndexEntry
        Raises KeyError: if the entry does not exist
        """
        return self._byname[key]

    def __iter__(self) -> Iterator[bytes]:
        """Iterate over the paths and stages in this index."""
        return iter(self._byname)

    def __contains__(self, key):
        return key in self._byname

    def get_sha1(self, path: bytes) -> bytes:
        """Return the (git object) SHA1 for the object at a path."""
        value = self[path]
        if isinstance(value, ConflictedIndexEntry):
            raise UnmergedEntries
        return value.sha

    def get_mode(self, path: bytes) -> int:
        """Return the POSIX file mode for the object at a path."""
        value = self[path]
        if isinstance(value, ConflictedIndexEntry):
            raise UnmergedEntries
        return value.mode

    def iterobjects(self) -> Iterable[Tuple[bytes, bytes, int]]:
        """Iterate over path, sha, mode tuples for use with commit_tree."""
        for path in self:
            entry = self[path]
            if isinstance(entry, ConflictedIndexEntry):
                raise UnmergedEntries
            yield path, entry.sha, cleanup_mode(entry.mode)

    def has_conflicts(self) -> bool:
        for value in self._byname.values():
            if isinstance(value, ConflictedIndexEntry):
                return True
        return False

    def clear(self):
        """Remove all contents from this index."""
        self._byname = {}

    def __setitem__(
        self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
    ) -> None:
        assert isinstance(name, bytes)
        self._byname[name] = value

    def __delitem__(self, name: bytes) -> None:
        del self._byname[name]

    def iteritems(
        self
    ) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
        return iter(self._byname.items())

    def items(self) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
        return iter(self._byname.items())

    def update(self, entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]):
        for key, value in entries.items():
            self[key] = value

    def paths(self):
        yield from self._byname.keys()

    def changes_from_tree(
        self, object_store, tree: ObjectID, want_unchanged: bool = False
    ):
        """Find the differences between the contents of this index and a tree.

        Args:
          object_store: Object store to use for retrieving tree contents
          tree: SHA1 of the root tree
          want_unchanged: Whether unchanged files should be reported
        Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
            newmode), (oldsha, newsha)
        """

        def lookup_entry(path):
            entry = self[path]
            return entry.sha, cleanup_mode(entry.mode)

        yield from changes_from_tree(
            self.paths(),
            lookup_entry,
            object_store,
            tree,
            want_unchanged=want_unchanged,
        )

    def commit(self, object_store):
        """Create a new tree from an index.

        Args:
          object_store: Object store to save the tree in
        Returns:
          Root tree SHA
        """
        return commit_tree(object_store, self.iterobjects())


def commit_tree(
    object_store: ObjectContainer, blobs: Iterable[Tuple[bytes, bytes, int]]
) -> bytes:
    """Commit a new tree.

    Args:
      object_store: Object store to add trees to
      blobs: Iterable over blob path, sha, mode entries
    Returns:
      SHA1 of the created tree.
    """
    trees: Dict[bytes, Any] = {b"": {}}

    def add_tree(path):
        if path in trees:
            return trees[path]
        dirname, basename = pathsplit(path)
        t = add_tree(dirname)
        assert isinstance(basename, bytes)
        newtree = {}
        t[basename] = newtree
        trees[path] = newtree
        return newtree

    for path, sha, mode in blobs:
        tree_path, basename = pathsplit(path)
        tree = add_tree(tree_path)
        tree[basename] = (mode, sha)

    def build_tree(path):
        tree = Tree()
        for basename, entry in trees[path].items():
            if isinstance(entry, dict):
                mode = stat.S_IFDIR
                sha = build_tree(pathjoin(path, basename))
            else:
                (mode, sha) = entry
            tree.add(basename, mode, sha)
        object_store.add_object(tree)
        return tree.id

    return build_tree(b"")


def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
    """Create a new tree from an index.

    Args:
      object_store: Object store to save the tree in
      index: Index file
    Note: This function is deprecated, use index.commit() instead.
    Returns: Root tree sha.
    """
    return commit_tree(object_store, index.iterobjects())


def changes_from_tree(
    names: Iterable[bytes],
    lookup_entry: Callable[[bytes], Tuple[bytes, int]],
    object_store: ObjectContainer,
    tree: Optional[bytes],
    want_unchanged=False,
) -> Iterable[
    Tuple[
        Tuple[Optional[bytes], Optional[bytes]],
        Tuple[Optional[int], Optional[int]],
        Tuple[Optional[bytes], Optional[bytes]],
    ]
]:
    """Find the differences between the contents of a tree and
    a working copy.

    Args:
      names: Iterable of names in the working copy
      lookup_entry: Function to lookup an entry in the working copy
      object_store: Object store to use for retrieving tree contents
      tree: SHA1 of the root tree, or None for an empty tree
      want_unchanged: Whether unchanged files should be reported
    Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
        (oldsha, newsha)
    """
    # TODO(jelmer): Support a include_trees option
    other_names = set(names)

    if tree is not None:
        for name, mode, sha in iter_tree_contents(object_store, tree):
            try:
                (other_sha, other_mode) = lookup_entry(name)
            except KeyError:
                # Was removed
                yield ((name, None), (mode, None), (sha, None))
            else:
                other_names.remove(name)
                if want_unchanged or other_sha != sha or other_mode != mode:
                    yield ((name, name), (mode, other_mode), (sha, other_sha))

    # Mention added files
    for name in other_names:
        try:
            (other_sha, other_mode) = lookup_entry(name)
        except KeyError:
            pass
        else:
            yield ((None, name), (None, other_mode), (None, other_sha))


def index_entry_from_stat(
    stat_val,
    hex_sha: bytes,
    mode: Optional[int] = None,
):
    """Create a new index entry from a stat value.

    Args:
      stat_val: POSIX stat_result instance
      hex_sha: Hex sha of the object
    """
    if mode is None:
        mode = cleanup_mode(stat_val.st_mode)

    return IndexEntry(
        stat_val.st_ctime,
        stat_val.st_mtime,
        stat_val.st_dev,
        stat_val.st_ino,
        mode,
        stat_val.st_uid,
        stat_val.st_gid,
        stat_val.st_size,
        hex_sha,
    )


if sys.platform == "win32":
    # On Windows, creating symlinks either requires administrator privileges
    # or developer mode. Raise a more helpful error when we're unable to
    # create symlinks

    # https://github.com/jelmer/dulwich/issues/1005

    class WindowsSymlinkPermissionError(PermissionError):
        def __init__(self, errno, msg, filename) -> None:
            super(PermissionError, self).__init__(
                errno,
                "Unable to create symlink; "
                "do you have developer mode enabled? %s" % msg,
                filename,
            )

    def symlink(src, dst, target_is_directory=False, *, dir_fd=None):
        try:
            return os.symlink(
                src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
            )
        except PermissionError as e:
            raise WindowsSymlinkPermissionError(e.errno, e.strerror, e.filename) from e
else:
    symlink = os.symlink


def build_file_from_blob(
    blob: Blob,
    mode: int,
    target_path: bytes,
    *,
    honor_filemode=True,
    tree_encoding="utf-8",
    symlink_fn=None,
):
    """Build a file or symlink on disk based on a Git object.

    Args:
      blob: The git object
      mode: File mode
      target_path: Path to write to
      honor_filemode: An optional flag to honor core.filemode setting in
        config file, default is core.filemode=True, change executable bit
      symlink: Function to use for creating symlinks
    Returns: stat object for the file
    """
    try:
        oldstat = os.lstat(target_path)
    except FileNotFoundError:
        oldstat = None
    contents = blob.as_raw_string()
    if stat.S_ISLNK(mode):
        if oldstat:
            os.unlink(target_path)
        if sys.platform == "win32":
            # os.readlink on Python3 on Windows requires a unicode string.
            contents = contents.decode(tree_encoding)  # type: ignore
            target_path = target_path.decode(tree_encoding)  # type: ignore
        (symlink_fn or symlink)(contents, target_path)
    else:
        if oldstat is not None and oldstat.st_size == len(contents):
            with open(target_path, "rb") as f:
                if f.read() == contents:
                    return oldstat

        with open(target_path, "wb") as f:
            # Write out file
            f.write(contents)

        if honor_filemode:
            os.chmod(target_path, mode)

    return os.lstat(target_path)


INVALID_DOTNAMES = (b".git", b".", b"..", b"")


def validate_path_element_default(element: bytes) -> bool:
    return element.lower() not in INVALID_DOTNAMES


def validate_path_element_ntfs(element: bytes) -> bool:
    stripped = element.rstrip(b". ").lower()
    if stripped in INVALID_DOTNAMES:
        return False
    if stripped == b"git~1":
        return False
    return True


def validate_path(path: bytes, element_validator=validate_path_element_default) -> bool:
    """Default path validator that just checks for .git/."""
    parts = path.split(b"/")
    for p in parts:
        if not element_validator(p):
            return False
    else:
        return True


def build_index_from_tree(
    root_path: Union[str, bytes],
    index_path: Union[str, bytes],
    object_store: ObjectContainer,
    tree_id: bytes,
    honor_filemode: bool = True,
    validate_path_element=validate_path_element_default,
    symlink_fn=None,
):
    """Generate and materialize index from a tree.

    Args:
      tree_id: Tree to materialize
      root_path: Target dir for materialized index files
      index_path: Target path for generated index
      object_store: Non-empty object store holding tree contents
      honor_filemode: An optional flag to honor core.filemode setting in
        config file, default is core.filemode=True, change executable bit
      validate_path_element: Function to validate path elements to check
        out; default just refuses .git and .. directories.

    Note: existing index is wiped and contents are not merged
        in a working dir. Suitable only for fresh clones.
    """
    index = Index(index_path, read=False)
    if not isinstance(root_path, bytes):
        root_path = os.fsencode(root_path)

    for entry in iter_tree_contents(object_store, tree_id):
        if not validate_path(entry.path, validate_path_element):
            continue
        full_path = _tree_to_fs_path(root_path, entry.path)

        if not os.path.exists(os.path.dirname(full_path)):
            os.makedirs(os.path.dirname(full_path))

        # TODO(jelmer): Merge new index into working tree
        if S_ISGITLINK(entry.mode):
            if not os.path.isdir(full_path):
                os.mkdir(full_path)
            st = os.lstat(full_path)
            # TODO(jelmer): record and return submodule paths
        else:
            obj = object_store[entry.sha]
            assert isinstance(obj, Blob)
            st = build_file_from_blob(
                obj,
                entry.mode,
                full_path,
                honor_filemode=honor_filemode,
                symlink_fn=symlink_fn,
            )

        # Add file to index
        if not honor_filemode or S_ISGITLINK(entry.mode):
            # we can not use tuple slicing to build a new tuple,
            # because on windows that will convert the times to
            # longs, which causes errors further along
            st_tuple = (
                entry.mode,
                st.st_ino,
                st.st_dev,
                st.st_nlink,
                st.st_uid,
                st.st_gid,
                st.st_size,
                st.st_atime,
                st.st_mtime,
                st.st_ctime,
            )
            st = st.__class__(st_tuple)
            # default to a stage 0 index entry (normal)
            # when reading from the filesystem
        index[entry.path] = index_entry_from_stat(st, entry.sha)

    index.write()


def blob_from_path_and_mode(fs_path: bytes, mode: int, tree_encoding="utf-8"):
    """Create a blob from a path and a stat object.

    Args:
      fs_path: Full file system path to file
      mode: File mode
    Returns: A `Blob` object
    """
    assert isinstance(fs_path, bytes)
    blob = Blob()
    if stat.S_ISLNK(mode):
        if sys.platform == "win32":
            # os.readlink on Python3 on Windows requires a unicode string.
            blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
        else:
            blob.data = os.readlink(fs_path)
    else:
        with open(fs_path, "rb") as f:
            blob.data = f.read()
    return blob


def blob_from_path_and_stat(fs_path: bytes, st, tree_encoding="utf-8"):
    """Create a blob from a path and a stat object.

    Args:
      fs_path: Full file system path to file
      st: A stat object
    Returns: A `Blob` object
    """
    return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)


def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
    """Read the head commit of a submodule.

    Args:
      path: path to the submodule
    Returns: HEAD sha, None if not a valid head/repository
    """
    from .errors import NotGitRepository
    from .repo import Repo

    # Repo currently expects a "str", so decode if necessary.
    # TODO(jelmer): Perhaps move this into Repo() ?
    if not isinstance(path, str):
        path = os.fsdecode(path)
    try:
        repo = Repo(path)
    except NotGitRepository:
        return None
    try:
        return repo.head()
    except KeyError:
        return None


def _has_directory_changed(tree_path: bytes, entry):
    """Check if a directory has changed after getting an error.

    When handling an error trying to create a blob from a path, call this
    function. It will check if the path is a directory. If it's a directory
    and a submodule, check the submodule head to see if it's has changed. If
    not, consider the file as changed as Git tracked a file and not a
    directory.

    Return true if the given path should be considered as changed and False
    otherwise or if the path is not a directory.
    """
    # This is actually a directory
    if os.path.exists(os.path.join(tree_path, b".git")):
        # Submodule
        head = read_submodule_head(tree_path)
        if entry.sha != head:
            return True
    else:
        # The file was changed to a directory, so consider it removed.
        return True

    return False


def get_unstaged_changes(
    index: Index, root_path: Union[str, bytes], filter_blob_callback=None
):
    """Walk through an index and check for differences against working tree.

    Args:
      index: index to check
      root_path: path in which to find files
    Returns: iterator over paths with unstaged changes
    """
    # For each entry in the index check the sha1 & ensure not staged
    if not isinstance(root_path, bytes):
        root_path = os.fsencode(root_path)

    for tree_path, entry in index.iteritems():
        full_path = _tree_to_fs_path(root_path, tree_path)
        if isinstance(entry, ConflictedIndexEntry):
            # Conflicted files are always unstaged
            yield tree_path
            continue

        try:
            st = os.lstat(full_path)
            if stat.S_ISDIR(st.st_mode):
                if _has_directory_changed(tree_path, entry):
                    yield tree_path
                continue

            if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
                continue

            blob = blob_from_path_and_stat(full_path, st)

            if filter_blob_callback is not None:
                blob = filter_blob_callback(blob, tree_path)
        except FileNotFoundError:
            # The file was removed, so we assume that counts as
            # different from whatever file used to exist.
            yield tree_path
        else:
            if blob.id != entry.sha:
                yield tree_path


os_sep_bytes = os.sep.encode("ascii")


def _tree_to_fs_path(root_path: bytes, tree_path: bytes):
    """Convert a git tree path to a file system path.

    Args:
      root_path: Root filesystem path
      tree_path: Git tree path as bytes

    Returns: File system path.
    """
    assert isinstance(tree_path, bytes)
    if os_sep_bytes != b"/":
        sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
    else:
        sep_corrected_path = tree_path
    return os.path.join(root_path, sep_corrected_path)


def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
    """Convert a file system path to a git tree path.

    Args:
      fs_path: File system path.

    Returns:  Git tree path as bytes
    """
    if not isinstance(fs_path, bytes):
        fs_path_bytes = os.fsencode(fs_path)
    else:
        fs_path_bytes = fs_path
    if os_sep_bytes != b"/":
        tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
    else:
        tree_path = fs_path_bytes
    return tree_path


def index_entry_from_directory(st, path: bytes) -> Optional[IndexEntry]:
    if os.path.exists(os.path.join(path, b".git")):
        head = read_submodule_head(path)
        if head is None:
            return None
        return index_entry_from_stat(st, head, mode=S_IFGITLINK)
    return None


def index_entry_from_path(
    path: bytes, object_store: Optional[ObjectContainer] = None
) -> Optional[IndexEntry]:
    """Create an index from a filesystem path.

    This returns an index value for files, symlinks
    and tree references. for directories and
    non-existent files it returns None

    Args:
      path: Path to create an index entry for
      object_store: Optional object store to
        save new blobs in
    Returns: An index entry; None for directories
    """
    assert isinstance(path, bytes)
    st = os.lstat(path)
    if stat.S_ISDIR(st.st_mode):
        return index_entry_from_directory(st, path)

    if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
        blob = blob_from_path_and_stat(path, st)
        if object_store is not None:
            object_store.add_object(blob)
        return index_entry_from_stat(st, blob.id)

    return None


def iter_fresh_entries(
    paths: Iterable[bytes],
    root_path: bytes,
    object_store: Optional[ObjectContainer] = None,
) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]:
    """Iterate over current versions of index entries on disk.

    Args:
      paths: Paths to iterate over
      root_path: Root path to access from
      object_store: Optional store to save new blobs in
    Returns: Iterator over path, index_entry
    """
    for path in paths:
        p = _tree_to_fs_path(root_path, path)
        try:
            entry = index_entry_from_path(p, object_store=object_store)
        except (FileNotFoundError, IsADirectoryError):
            entry = None
        yield path, entry


def iter_fresh_objects(
    paths: Iterable[bytes], root_path: bytes, include_deleted=False, object_store=None
) -> Iterator[Tuple[bytes, Optional[bytes], Optional[int]]]:
    """Iterate over versions of objects on disk referenced by index.

    Args:
      root_path: Root path to access from
      include_deleted: Include deleted entries with sha and
        mode set to None
      object_store: Optional object store to report new items to
    Returns: Iterator over path, sha, mode
    """
    for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
        if entry is None:
            if include_deleted:
                yield path, None, None
        else:
            yield path, entry.sha, cleanup_mode(entry.mode)


def refresh_index(index: Index, root_path: bytes):
    """Refresh the contents of an index.

    This is the equivalent to running 'git commit -a'.

    Args:
      index: Index to update
      root_path: Root filesystem path
    """
    for path, entry in iter_fresh_entries(index, root_path):
        if entry:
            index[path] = entry


class locked_index:
    """Lock the index while making modifications.

    Works as a context manager.
    """

    def __init__(self, path: Union[bytes, str]) -> None:
        self._path = path

    def __enter__(self):
        self._file = GitFile(self._path, "wb")
        self._index = Index(self._path)
        return self._index

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is not None:
            self._file.abort()
            return
        try:
            f = SHA1Writer(self._file)
            write_index_dict(f, self._index._byname)
        except BaseException:
            self._file.abort()
        else:
            f.close()