# swift.py -- Repo implementation atop OpenStack SWIFT
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Fabien Boucher <fabien.boucher@enovance.com>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#
"""Repo implementation atop OpenStack SWIFT."""
# TODO: Refactor to share more code with dulwich/repo.py.
# TODO(fbo): Second attempt to _send() must be notified via real log
# TODO(fbo): More logs for operations
import os
import stat
import zlib
import tempfile
import posixpath
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
from io import BytesIO
try:
from ConfigParser import ConfigParser
except ImportError:
from configparser import ConfigParser
from geventhttpclient import HTTPClient
from dulwich.greenthreads import (
GreenThreadsMissingObjectFinder,
GreenThreadsObjectStoreIterator,
)
from dulwich.lru_cache import LRUSizeCache
from dulwich.objects import (
Blob,
Commit,
Tree,
Tag,
S_ISGITLINK,
)
from dulwich.object_store import (
PackBasedObjectStore,
PACKDIR,
INFODIR,
)
from dulwich.pack import (
PackData,
Pack,
PackIndexer,
PackStreamCopier,
write_pack_header,
compute_file_sha,
iter_sha1,
write_pack_index_v2,
load_pack_index_file,
read_pack_header,
_compute_object_size,
unpack_object,
write_pack_object,
)
from dulwich.protocol import TCP_GIT_PORT
from dulwich.refs import (
InfoRefsContainer,
read_info_refs,
write_info_refs,
)
from dulwich.repo import (
BaseRepo,
OBJECTDIR,
)
from dulwich.server import (
Backend,
TCPGitServer,
)
try:
from simplejson import loads as json_loads
from simplejson import dumps as json_dumps
except ImportError:
from json import loads as json_loads
from json import dumps as json_dumps
import sys
"""
# Configuration file sample
[swift]
# Authentication URL (Keystone or Swift)
auth_url = http://127.0.0.1:5000/v2.0
# Authentication version to use
auth_ver = 2
# The tenant and username separated by a semicolon
username = admin;admin
# The user password
password = pass
# The Object storage region to use (auth v2) (Default RegionOne)
region_name = RegionOne
# The Object storage endpoint URL to use (auth v2) (Default internalURL)
endpoint_type = internalURL
# Concurrency to use for parallel tasks (Default 10)
concurrency = 10
# Size of the HTTP pool (Default 10)
http_pool_length = 10
# Timeout delay for HTTP connections (Default 20)
http_timeout = 20
# Chunk size to read from pack (Bytes) (Default 12228)
chunk_length = 12228
# Cache size (MBytes) (Default 20)
cache_length = 20
"""
class PackInfoObjectStoreIterator(GreenThreadsObjectStoreIterator):
def __len__(self):
while len(self.finder.objects_to_send):
for _ in range(0, len(self.finder.objects_to_send)):
sha = self.finder.next()
self._shas.append(sha)
return len(self._shas)
class PackInfoMissingObjectFinder(GreenThreadsMissingObjectFinder):
def next(self):
while True:
if not self.objects_to_send:
return None
(sha, name, leaf) = self.objects_to_send.pop()
if sha not in self.sha_done:
break
if not leaf:
info = self.object_store.pack_info_get(sha)
if info[0] == Commit.type_num:
self.add_todo([(info[2], "", False)])
elif info[0] == Tree.type_num:
self.add_todo([tuple(i) for i in info[1]])
elif info[0] == Tag.type_num:
self.add_todo([(info[1], None, False)])
if sha in self._tagged:
self.add_todo([(self._tagged[sha], None, True)])
self.sha_done.add(sha)
self.progress("counting objects: %d\r" % len(self.sha_done))
return (sha, name)
def load_conf(path=None, file=None):
"""Load configuration in global var CONF
:param path: The path to the configuration file
:param file: If provided read instead the file like object
"""
conf = ConfigParser()
if file:
try:
conf.read_file(file, path)
except AttributeError:
# read_file only exists in Python3
conf.readfp(file)
return conf
confpath = None
if not path:
try:
confpath = os.environ['DULWICH_SWIFT_CFG']
except KeyError:
raise Exception("You need to specify a configuration file")
else:
confpath = path
if not os.path.isfile(confpath):
raise Exception("Unable to read configuration file %s" % confpath)
conf.read(confpath)
return conf
def swift_load_pack_index(scon, filename):
"""Read a pack index file from Swift
:param scon: a `SwiftConnector` instance
:param filename: Path to the index file objectise
:return: a `PackIndexer` instance
"""
f = scon.get_object(filename)
try:
return load_pack_index_file(filename, f)
finally:
f.close()
def pack_info_create(pack_data, pack_index):
pack = Pack.from_objects(pack_data, pack_index)
info = {}
for obj in pack.iterobjects():
# Commit
if obj.type_num == Commit.type_num:
info[obj.id] = (obj.type_num, obj.parents, obj.tree)
# Tree
elif obj.type_num == Tree.type_num:
shas = [(s, n, not stat.S_ISDIR(m)) for
n, m, s in obj.items() if not S_ISGITLINK(m)]
info[obj.id] = (obj.type_num, shas)
# Blob
elif obj.type_num == Blob.type_num:
info[obj.id] = None
# Tag
elif obj.type_num == Tag.type_num:
info[obj.id] = (obj.type_num, obj.object[1])
return zlib.compress(json_dumps(info))
def load_pack_info(filename, scon=None, file=None):
if not file:
f = scon.get_object(filename)
else:
f = file
if not f:
return None
try:
return json_loads(zlib.decompress(f.read()))
finally:
f.close()
class SwiftException(Exception):
pass
class SwiftConnector(object):
"""A Connector to swift that manage authentication and errors catching
"""
def __init__(self, root, conf):
""" Initialize a SwiftConnector
:param root: The swift container that will act as Git bare repository
:param conf: A ConfigParser Object
"""
self.conf = conf
self.auth_ver = self.conf.get("swift", "auth_ver")
if self.auth_ver not in ["1", "2"]:
raise NotImplementedError(
"Wrong authentication version use either 1 or 2")
self.auth_url = self.conf.get("swift", "auth_url")
self.user = self.conf.get("swift", "username")
self.password = self.conf.get("swift", "password")
self.concurrency = self.conf.getint('swift', 'concurrency') or 10
self.http_timeout = self.conf.getint('swift', 'http_timeout') or 20
self.http_pool_length = \
self.conf.getint('swift', 'http_pool_length') or 10
self.region_name = self.conf.get("swift", "region_name") or "RegionOne"
self.endpoint_type = \
self.conf.get("swift", "endpoint_type") or "internalURL"
self.cache_length = self.conf.getint("swift", "cache_length") or 20
self.chunk_length = self.conf.getint("swift", "chunk_length") or 12228
self.root = root
block_size = 1024 * 12 # 12KB
if self.auth_ver == "1":
self.storage_url, self.token = self.swift_auth_v1()
else:
self.storage_url, self.token = self.swift_auth_v2()
token_header = {'X-Auth-Token': str(self.token)}
self.httpclient = \
HTTPClient.from_url(str(self.storage_url),
concurrency=self.http_pool_length,
block_size=block_size,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
headers=token_header)
self.base_path = str(posixpath.join(
urlparse.urlparse(self.storage_url).path, self.root))
def swift_auth_v1(self):
self.user = self.user.replace(";", ":")
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
headers = {'X-Auth-User': self.user,
'X-Auth-Key': self.password}
path = urlparse.urlparse(self.auth_url).path
ret = auth_httpclient.request('GET', path, headers=headers)
# Should do something with redirections (301 in my case)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v1.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
path, ret.status_code,
str(ret.items())))
storage_url = ret['X-Storage-Url']
token = ret['X-Auth-Token']
return storage_url, token
def swift_auth_v2(self):
self.tenant, self.user = self.user.split(';')
auth_dict = {}
auth_dict['auth'] = {'passwordCredentials':
{
'username': self.user,
'password': self.password,
},
'tenantName': self.tenant}
auth_json = json_dumps(auth_dict)
headers = {'Content-Type': 'application/json'}
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
path = urlparse.urlparse(self.auth_url).path
if not path.endswith('tokens'):
path = posixpath.join(path, 'tokens')
ret = auth_httpclient.request('POST', path,
body=auth_json,
headers=headers)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v2.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
path, ret.status_code,
str(ret.items())))
Loading ...