Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

/ porcelain.py

# porcelain.py -- Porcelain-like layer on top of Dulwich
# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Simple wrapper that provides porcelain-like functions on top of Dulwich.

Currently implemented:
 * archive
 * add
 * branch{_create,_delete,_list}
 * check-ignore
 * checkout
 * clone
 * commit
 * commit-tree
 * daemon
 * describe
 * diff-tree
 * fetch
 * init
 * ls-files
 * ls-remote
 * ls-tree
 * pull
 * push
 * rm
 * remote{_add}
 * receive-pack
 * reset
 * rev-list
 * tag{_create,_delete,_list}
 * upload-pack
 * update-server-info
 * status
 * symbolic-ref

These functions are meant to behave similarly to the git subcommands.
Differences in behaviour are considered bugs.

Functions should generally accept both unicode strings and bytestrings
"""

from collections import namedtuple
from contextlib import (
    closing,
    contextmanager,
)
from io import BytesIO, RawIOBase
import datetime
import os
import posixpath
import stat
import sys
import time

from dulwich.archive import (
    tar_stream,
    )
from dulwich.client import (
    get_transport_and_path,
    )
from dulwich.config import (
    StackedConfig,
    )
from dulwich.diff_tree import (
    CHANGE_ADD,
    CHANGE_DELETE,
    CHANGE_MODIFY,
    CHANGE_RENAME,
    CHANGE_COPY,
    RENAME_CHANGE_TYPES,
    )
from dulwich.errors import (
    SendPackError,
    UpdateRefsError,
    )
from dulwich.ignore import IgnoreFilterManager
from dulwich.index import (
    blob_from_path_and_stat,
    get_unstaged_changes,
    )
from dulwich.object_store import (
    tree_lookup_path,
    )
from dulwich.objects import (
    Commit,
    Tag,
    format_timezone,
    parse_timezone,
    pretty_format_tree_entry,
    )
from dulwich.objectspec import (
    parse_commit,
    parse_object,
    parse_ref,
    parse_reftuples,
    parse_tree,
    )
from dulwich.pack import (
    write_pack_index,
    write_pack_objects,
    )
from dulwich.patch import write_tree_diff
from dulwich.protocol import (
    Protocol,
    ZERO_SHA,
    )
from dulwich.refs import (
    ANNOTATED_TAG_SUFFIX,
    strip_peeled_refs,
)
from dulwich.repo import (BaseRepo, Repo)
from dulwich.server import (
    FileSystemBackend,
    TCPGitServer,
    ReceivePackHandler,
    UploadPackHandler,
    update_server_info as server_update_server_info,
    )


# Module level tuple definition for status output
GitStatus = namedtuple('GitStatus', 'staged unstaged untracked')


class NoneStream(RawIOBase):
    """Fallback if stdout or stderr are unavailable, does nothing."""
    def read(self, size=-1):
        return None

    def readall(self):
        return None

    def readinto(self, b):
        return None

    def write(self, b):
        return None


default_bytes_out_stream = getattr(
        sys.stdout, 'buffer', sys.stdout
    ) or NoneStream()
default_bytes_err_stream = getattr(
        sys.stderr, 'buffer', sys.stderr
    ) or NoneStream()


DEFAULT_ENCODING = 'utf-8'


class RemoteExists(Exception):
    """Raised when the remote already exists."""


def open_repo(path_or_repo):
    """Open an argument that can be a repository or a path for a repository."""
    if isinstance(path_or_repo, BaseRepo):
        return path_or_repo
    return Repo(path_or_repo)


@contextmanager
def _noop_context_manager(obj):
    """Context manager that has the same api as closing but does nothing."""
    yield obj


def open_repo_closing(path_or_repo):
    """Open an argument that can be a repository or a path for a repository.
    returns a context manager that will close the repo on exit if the argument
    is a path, else does nothing if the argument is a repo.
    """
    if isinstance(path_or_repo, BaseRepo):
        return _noop_context_manager(path_or_repo)
    return closing(Repo(path_or_repo))


def path_to_tree_path(repopath, path):
    """Convert a path to a path usable in an index, e.g. bytes and relative to
    the repository root.

    :param repopath: Repository path, absolute or relative to the cwd
    :param path: A path, absolute or relative to the cwd
    :return: A path formatted for use in e.g. an index
    """
    if not isinstance(path, bytes):
        path = path.encode(sys.getfilesystemencoding())
    if not isinstance(repopath, bytes):
        repopath = repopath.encode(sys.getfilesystemencoding())
    treepath = os.path.relpath(path, repopath)
    if treepath.startswith(b'..'):
        raise ValueError('Path not in repo')
    if os.path.sep != '/':
        treepath = treepath.replace(os.path.sep.encode('ascii'), b'/')
    return treepath


def archive(repo, committish=None, outstream=default_bytes_out_stream,
            errstream=default_bytes_err_stream):
    """Create an archive.

    :param repo: Path of repository for which to generate an archive.
    :param committish: Commit SHA1 or ref to use
    :param outstream: Output stream (defaults to stdout)
    :param errstream: Error stream (defaults to stderr)
    """

    if committish is None:
        committish = "HEAD"
    with open_repo_closing(repo) as repo_obj:
        c = parse_commit(repo_obj, committish)
        for chunk in tar_stream(
                repo_obj.object_store, repo_obj.object_store[c.tree],
                c.commit_time):
            outstream.write(chunk)


def update_server_info(repo="."):
    """Update server info files for a repository.

    :param repo: path to the repository
    """
    with open_repo_closing(repo) as r:
        server_update_server_info(r)


def symbolic_ref(repo, ref_name, force=False):
    """Set git symbolic ref into HEAD.

    :param repo: path to the repository
    :param ref_name: short name of the new ref
    :param force: force settings without checking if it exists in refs/heads
    """
    with open_repo_closing(repo) as repo_obj:
        ref_path = _make_branch_ref(ref_name)
        if not force and ref_path not in repo_obj.refs.keys():
            raise ValueError('fatal: ref `%s` is not a ref' % ref_name)
        repo_obj.refs.set_symbolic_ref(b'HEAD', ref_path)


def commit(repo=".", message=None, author=None, committer=None, encoding=None):
    """Create a new commit.

    :param repo: Path to repository
    :param message: Optional commit message
    :param author: Optional author name and email
    :param committer: Optional committer name and email
    :return: SHA1 of the new commit
    """
    # FIXME: Support --all argument
    # FIXME: Support --signoff argument
    if getattr(message, 'encode', None):
        message = message.encode(encoding or DEFAULT_ENCODING)
    if getattr(author, 'encode', None):
        author = author.encode(encoding or DEFAULT_ENCODING)
    if getattr(committer, 'encode', None):
        committer = committer.encode(encoding or DEFAULT_ENCODING)
    with open_repo_closing(repo) as r:
        return r.do_commit(
                message=message, author=author, committer=committer,
                encoding=encoding)


def commit_tree(repo, tree, message=None, author=None, committer=None):
    """Create a new commit object.

    :param repo: Path to repository
    :param tree: An existing tree object
    :param author: Optional author name and email
    :param committer: Optional committer name and email
    """
    with open_repo_closing(repo) as r:
        return r.do_commit(
            message=message, tree=tree, committer=committer, author=author)


def init(path=".", bare=False):
    """Create a new git repository.

    :param path: Path to repository.
    :param bare: Whether to create a bare repository.
    :return: A Repo instance
    """
    if not os.path.exists(path):
        os.mkdir(path)

    if bare:
        return Repo.init_bare(path)
    else:
        return Repo.init(path)


def clone(source, target=None, bare=False, checkout=None,
          errstream=default_bytes_err_stream, outstream=None,
          origin=b"origin", depth=None, **kwargs):
    """Clone a local or remote git repository.

    :param source: Path or URL for source repository
    :param target: Path to target repository (optional)
    :param bare: Whether or not to create a bare repository
    :param checkout: Whether or not to check-out HEAD after cloning
    :param errstream: Optional stream to write progress to
    :param outstream: Optional stream to write progress to (deprecated)
    :param origin: Name of remote from the repository used to clone
    :param depth: Depth to fetch at
    :return: The new repository
    """
    # TODO(jelmer): This code overlaps quite a bit with Repo.clone
    if outstream is not None:
        import warnings
        warnings.warn(
            "outstream= has been deprecated in favour of errstream=.",
            DeprecationWarning, stacklevel=3)
        errstream = outstream

    if checkout is None:
        checkout = (not bare)
    if checkout and bare:
        raise ValueError("checkout and bare are incompatible")

    if target is None:
        target = source.split("/")[-1]

    if not os.path.exists(target):
        os.mkdir(target)

    if bare:
        r = Repo.init_bare(target)
Loading ...