Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

/ contrib / swift.py

# swift.py -- Repo implementation atop OpenStack SWIFT
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Fabien Boucher <fabien.boucher@enovance.com>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Repo implementation atop OpenStack SWIFT."""

# TODO: Refactor to share more code with dulwich/repo.py.
# TODO(fbo): Second attempt to _send() must be notified via real log
# TODO(fbo): More logs for operations

import os
import stat
import zlib
import tempfile
import posixpath

try:
    import urlparse
except ImportError:
    import urllib.parse as urlparse

from io import BytesIO
try:
    from ConfigParser import ConfigParser
except ImportError:
    from configparser import ConfigParser
from geventhttpclient import HTTPClient

from dulwich.greenthreads import (
    GreenThreadsMissingObjectFinder,
    GreenThreadsObjectStoreIterator,
    )

from dulwich.lru_cache import LRUSizeCache
from dulwich.objects import (
    Blob,
    Commit,
    Tree,
    Tag,
    S_ISGITLINK,
    )
from dulwich.object_store import (
    PackBasedObjectStore,
    PACKDIR,
    INFODIR,
    )
from dulwich.pack import (
    PackData,
    Pack,
    PackIndexer,
    PackStreamCopier,
    write_pack_header,
    compute_file_sha,
    iter_sha1,
    write_pack_index_v2,
    load_pack_index_file,
    read_pack_header,
    _compute_object_size,
    unpack_object,
    write_pack_object,
    )
from dulwich.protocol import TCP_GIT_PORT
from dulwich.refs import (
    InfoRefsContainer,
    read_info_refs,
    write_info_refs,
    )
from dulwich.repo import (
    BaseRepo,
    OBJECTDIR,
    )
from dulwich.server import (
    Backend,
    TCPGitServer,
    )

try:
    from simplejson import loads as json_loads
    from simplejson import dumps as json_dumps
except ImportError:
    from json import loads as json_loads
    from json import dumps as json_dumps

import sys


"""
# Configuration file sample
[swift]
# Authentication URL (Keystone or Swift)
auth_url = http://127.0.0.1:5000/v2.0
# Authentication version to use
auth_ver = 2
# The tenant and username separated by a semicolon
username = admin;admin
# The user password
password = pass
# The Object storage region to use (auth v2) (Default RegionOne)
region_name = RegionOne
# The Object storage endpoint URL to use (auth v2) (Default internalURL)
endpoint_type = internalURL
# Concurrency to use for parallel tasks (Default 10)
concurrency = 10
# Size of the HTTP pool (Default 10)
http_pool_length = 10
# Timeout delay for HTTP connections (Default 20)
http_timeout = 20
# Chunk size to read from pack (Bytes) (Default 12228)
chunk_length = 12228
# Cache size (MBytes) (Default 20)
cache_length = 20
"""


class PackInfoObjectStoreIterator(GreenThreadsObjectStoreIterator):

    def __len__(self):
        while len(self.finder.objects_to_send):
            for _ in range(0, len(self.finder.objects_to_send)):
                sha = self.finder.next()
                self._shas.append(sha)
        return len(self._shas)


class PackInfoMissingObjectFinder(GreenThreadsMissingObjectFinder):

    def next(self):
        while True:
            if not self.objects_to_send:
                return None
            (sha, name, leaf) = self.objects_to_send.pop()
            if sha not in self.sha_done:
                break
        if not leaf:
            info = self.object_store.pack_info_get(sha)
            if info[0] == Commit.type_num:
                self.add_todo([(info[2], "", False)])
            elif info[0] == Tree.type_num:
                self.add_todo([tuple(i) for i in info[1]])
            elif info[0] == Tag.type_num:
                self.add_todo([(info[1], None, False)])
            if sha in self._tagged:
                self.add_todo([(self._tagged[sha], None, True)])
        self.sha_done.add(sha)
        self.progress("counting objects: %d\r" % len(self.sha_done))
        return (sha, name)


def load_conf(path=None, file=None):
    """Load configuration in global var CONF

    :param path: The path to the configuration file
    :param file: If provided read instead the file like object
    """
    conf = ConfigParser()
    if file:
        try:
            conf.read_file(file, path)
        except AttributeError:
            # read_file only exists in Python3
            conf.readfp(file)
        return conf
    confpath = None
    if not path:
        try:
            confpath = os.environ['DULWICH_SWIFT_CFG']
        except KeyError:
            raise Exception("You need to specify a configuration file")
    else:
        confpath = path
    if not os.path.isfile(confpath):
        raise Exception("Unable to read configuration file %s" % confpath)
    conf.read(confpath)
    return conf


def swift_load_pack_index(scon, filename):
    """Read a pack index file from Swift

    :param scon: a `SwiftConnector` instance
    :param filename: Path to the index file objectise
    :return: a `PackIndexer` instance
    """
    f = scon.get_object(filename)
    try:
        return load_pack_index_file(filename, f)
    finally:
        f.close()


def pack_info_create(pack_data, pack_index):
    pack = Pack.from_objects(pack_data, pack_index)
    info = {}
    for obj in pack.iterobjects():
        # Commit
        if obj.type_num == Commit.type_num:
            info[obj.id] = (obj.type_num, obj.parents, obj.tree)
        # Tree
        elif obj.type_num == Tree.type_num:
            shas = [(s, n, not stat.S_ISDIR(m)) for
                    n, m, s in obj.items() if not S_ISGITLINK(m)]
            info[obj.id] = (obj.type_num, shas)
        # Blob
        elif obj.type_num == Blob.type_num:
            info[obj.id] = None
        # Tag
        elif obj.type_num == Tag.type_num:
            info[obj.id] = (obj.type_num, obj.object[1])
    return zlib.compress(json_dumps(info))


def load_pack_info(filename, scon=None, file=None):
    if not file:
        f = scon.get_object(filename)
    else:
        f = file
    if not f:
        return None
    try:
        return json_loads(zlib.decompress(f.read()))
    finally:
        f.close()


class SwiftException(Exception):
    pass


class SwiftConnector(object):
    """A Connector to swift that manage authentication and errors catching
    """

    def __init__(self, root, conf):
        """ Initialize a SwiftConnector

        :param root: The swift container that will act as Git bare repository
        :param conf: A ConfigParser Object
        """
        self.conf = conf
        self.auth_ver = self.conf.get("swift", "auth_ver")
        if self.auth_ver not in ["1", "2"]:
            raise NotImplementedError(
                "Wrong authentication version use either 1 or 2")
        self.auth_url = self.conf.get("swift", "auth_url")
        self.user = self.conf.get("swift", "username")
        self.password = self.conf.get("swift", "password")
        self.concurrency = self.conf.getint('swift', 'concurrency') or 10
        self.http_timeout = self.conf.getint('swift', 'http_timeout') or 20
        self.http_pool_length = \
            self.conf.getint('swift', 'http_pool_length') or 10
        self.region_name = self.conf.get("swift", "region_name") or "RegionOne"
        self.endpoint_type = \
            self.conf.get("swift", "endpoint_type") or "internalURL"
        self.cache_length = self.conf.getint("swift", "cache_length") or 20
        self.chunk_length = self.conf.getint("swift", "chunk_length") or 12228
        self.root = root
        block_size = 1024 * 12  # 12KB
        if self.auth_ver == "1":
            self.storage_url, self.token = self.swift_auth_v1()
        else:
            self.storage_url, self.token = self.swift_auth_v2()

        token_header = {'X-Auth-Token': str(self.token)}
        self.httpclient = \
            HTTPClient.from_url(str(self.storage_url),
                                concurrency=self.http_pool_length,
                                block_size=block_size,
                                connection_timeout=self.http_timeout,
                                network_timeout=self.http_timeout,
                                headers=token_header)
        self.base_path = str(posixpath.join(
                urlparse.urlparse(self.storage_url).path, self.root))

    def swift_auth_v1(self):
        self.user = self.user.replace(";", ":")
        auth_httpclient = HTTPClient.from_url(
            self.auth_url,
            connection_timeout=self.http_timeout,
            network_timeout=self.http_timeout,
            )
        headers = {'X-Auth-User': self.user,
                   'X-Auth-Key': self.password}
        path = urlparse.urlparse(self.auth_url).path

        ret = auth_httpclient.request('GET', path, headers=headers)

        # Should do something with redirections (301 in my case)

        if ret.status_code < 200 or ret.status_code >= 300:
            raise SwiftException('AUTH v1.0 request failed on ' +
                                 '%s with error code %s (%s)'
                                 % (str(auth_httpclient.get_base_url()) +
                                    path, ret.status_code,
                                    str(ret.items())))
        storage_url = ret['X-Storage-Url']
        token = ret['X-Auth-Token']
        return storage_url, token

    def swift_auth_v2(self):
        self.tenant, self.user = self.user.split(';')
        auth_dict = {}
        auth_dict['auth'] = {'passwordCredentials':
                             {
                                 'username': self.user,
                                 'password': self.password,
                             },
                             'tenantName': self.tenant}
        auth_json = json_dumps(auth_dict)
        headers = {'Content-Type': 'application/json'}
        auth_httpclient = HTTPClient.from_url(
            self.auth_url,
            connection_timeout=self.http_timeout,
            network_timeout=self.http_timeout,
            )
        path = urlparse.urlparse(self.auth_url).path
        if not path.endswith('tokens'):
            path = posixpath.join(path, 'tokens')
        ret = auth_httpclient.request('POST', path,
                                      body=auth_json,
                                      headers=headers)

        if ret.status_code < 200 or ret.status_code >= 300:
            raise SwiftException('AUTH v2.0 request failed on ' +
                                 '%s with error code %s (%s)'
                                 % (str(auth_httpclient.get_base_url()) +
                                    path, ret.status_code,
                                    str(ret.items())))
Loading ...