Repository URL to install this package:
Version:
5.2.7 ▾
|
"""Utilities for s3fuse plugin."""
# Copyright 2015 TrilioData Inc.
# All Rights Reserved.
import errno
import fcntl
import os
import socket
import struct
import subprocess
from multiprocessing import Process
from time import sleep
from oslo_concurrency import processutils
try:
import ConfigParser
except ImportError:
import configparser as ConfigParser
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def run_process(method, progress_tracking_file_path, *args):
"""Run processs."""
try:
process = Process(target=method, args=args)
process.start()
if process.is_alive():
sleep(10)
# update the timestamp so the workloadmgr knows contego
# is alive and kicking
os.utime(progress_tracking_file_path, None)
except Exception as ex:
raise ex
def get_ip_address(ifname):
"""Get IP address of the interface."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])
def get_lan_ip(config_file):
"""IP address."""
def get_config_value(cfg, Section, Option, Key=None):
if cfg.has_option(Section, Option):
return cfg.get(Section, Option, Key)
else:
return None
cfg = ConfigParser.ConfigParser()
cfg.read(config_file)
ip = get_config_value(cfg, 'DEFAULT', 'my_ip')
if ip is not None:
return ip
return socket.gethostname()
def ensure_tree(path):
"""Create a directory (and any ancestor directories required).
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise
def touch_file(file_path):
"""Utility to simulate the filesystem "touch" command.
Args:
file_path (str): Path to file to be touched.
"""
# In order to support both local filesystem and Fuse mounts we
# need to open the file and call os.utime. Opening the file updates
# the time stamp for Fuse mounts, but os.utime() updates for local/nfs
# file systems.
with open(file_path, 'a'):
os.utime(file_path, None)
def update_progress(progress_tracking_file_path, progress_msg):
"""Update progrsss."""
LOG.info("update_progress %s: %s" %
(progress_tracking_file_path, progress_msg))
try:
with open(progress_tracking_file_path, 'a') as progress_tracking_file:
progress_tracking_file.write(progress_msg)
except Exception:
sleep(10)
with open(progress_tracking_file_path, 'a') as progress_tracking_file:
progress_tracking_file.write(progress_msg)
def get_progress(progress_tracking_file_path):
"""Return the content of progress file."""
try:
with open(progress_tracking_file_path, 'r') as progress_tracking_file:
return progress_tracking_file.readlines()
except Exception:
sleep(10)
with open(progress_tracking_file_path, 'r') as progress_tracking_file:
return progress_tracking_file.readlines()
def run_cmd(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False):
"""Run shell command and return the pointer to the process."""
process = subprocess.Popen(cmd,
stdin=stdin,
stdout=stdout,
stderr=stderr,
shell=shell)
return process
def poll_process(
process,
cmd,
sleep_time=5,
no_of_count=0,
progress_file=None):
"""Poll process."""
try:
count = 0
while process.poll() is None:
if no_of_count != 0 and count > no_of_count:
break
sleep(sleep_time)
count += 1
if progress_file is not None:
# update the timestamp so the workloadmgr knows contego
# is alive and kicking
touch_file(progress_file)
process.stdin.close()
_returncode = process.returncode
if _returncode:
LOG.error(('Result was %s' % _returncode))
LOG.exception("Execution error %(exit_code)d (%(stderr)s). "
"cmd %(cmd)s" %
{'exit_code': _returncode,
'stderr': str(process.stderr.read(), 'utf-8'),
'cmd': cmd})
return _returncode
except Exception as ex:
raise ex
def get_cmd_specs(cmd, **kwargs):
"""Get commandspecs."""
def _key_check(locals, keys):
for key in keys:
if locals[key] is None:
raise Exception(
"Required parameter: %s for %s "
"connection can not be None" %
(key, locals['cmd']))
try:
authorized_key = kwargs.get('authorized_key', None)
user = kwargs.get('user', None)
host = kwargs.get('host', None)
if cmd == 'ssh':
_key_check(locals(), ['authorized_key', 'user', 'host'])
cmdspec = [
'ssh',
'-o',
'StrictHostKeyChecking=no',
'-i',
authorized_key,
user + '@' + host]
elif cmd == 'scp':
source_path = kwargs.get('source_path', None)
dest_path = kwargs.get('dest_path', None)
_key_check(locals(), ['authorized_key', 'user',
'host', 'source_path', 'dest_path'])
cmdspec = [
'scp',
'-o',
'StrictHostKeyChecking=no',
'-i',
authorized_key,
'-r',
user +
'@' +
host +
':' +
source_path +
"/*",
dest_path]
return cmdspec
except Exception as ex:
raise ex
def get_ssl_cert_path():
ca_proxy_cert = os.getenv("AWS_CA_BUNDLE")
""" return the Cert file path as per unix distribution"""
ssl_cert = ("/etc/ssl/certs/ca-certificates.crt",
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem")
if ca_proxy_cert:
ssl_cert = ("/etc/ssl/certs/ca-certificates.crt",
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
ca_proxy_cert)
for path in ssl_cert:
try:
if os.path.exists(path):
return path
except Exception as ex:
LOG.exception(ex)
raise
class get_dummy_object(object):
"""Return python object from a dict."""
def __init__(self, **kwargs):
"""Constructor."""
self.__dict__.update(kwargs)
def mount(fstype, device, mountpoint, options, timeout=0):
if isinstance(timeout, int) and timeout > 0:
mount_cmd = ['timeout', '-sKILL', timeout, 'mount']
else:
mount_cmd = ['mount']
if fstype:
mount_cmd.extend(['-t', fstype])
if isinstance(options, tuple) and len(options) > 0:
mount_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
mount_cmd.extend(options)
mount_cmd.extend([device, mountpoint])
return processutils.execute(*mount_cmd)
def umount(mountpoint, options, timeout=0):
if isinstance(timeout, int) and timeout > 0:
umount_cmd = ['timeout', '-sKILL', timeout, 'umount']
else:
umount_cmd = ['umount']
if isinstance(options, tuple) and len(options) > 0:
umount_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
umount_cmd.extend(options)
umount_cmd.extend([mountpoint])
return processutils.execute(*umount_cmd)
def chown(path, options=[]):
chown_cmd = ['chown']
if isinstance(options, tuple) and len(options) > 0:
chown_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
chown_cmd.extend(options)
chown_cmd.extend([path])
return processutils.execute(*chown_cmd)
def mkdir(dir_path, options=[]):
mkdir_cmd = ['mkdir']
if isinstance(options, tuple) and len(options) > 0:
mkdir_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
mkdir_cmd.extend(options)
mkdir_cmd.extend([dir_path])
return processutils.execute(*mkdir_cmd)
def copy(source, target, options=[]):
cp_cmd = ['cp']
if isinstance(options, tuple) and len(options) > 0:
cp_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
cp_cmd.extend(options)
cp_cmd.extend([source, target])
return processutils.execute(*cp_cmd)
def remove(path, options=[]):
rm_cmd = ['rm', '-rf']
if isinstance(options, tuple) and len(options) > 0:
rm_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
rm_cmd.extend(options)
rm_cmd.extend([path])
return processutils.execute(*rm_cmd)
def get_retention_mode(s3client, bucket_name, lock_enabled):
retention_mode = ""
try:
LOG.debug("Getting the retention mode of the bucket")
s3client.head_bucket(Bucket=bucket_name)
try:
obj_lock_cfg = s3client.get_object_lock_configuration(
Bucket=bucket_name)
if lock_enabled:
retention_mode = "GOVERNANCE"
# Get the retention mode. It's GOVERNANCE by default
if "Rule" in obj_lock_cfg["ObjectLockConfiguration"] and \
"DefaultRetention" in obj_lock_cfg[
"ObjectLockConfiguration"]["Rule"]:
retention_mode = obj_lock_cfg[
"ObjectLockConfiguration"][
"Rule"]["DefaultRetention"]["Mode"]
except Exception:
if lock_enabled:
raise
except BaseException:
raise
return retention_mode