Repository URL to install this package:
|
Version:
4.1.94.1.dev12 ▾
|
"""
Helpers for filesystem related routines.
"""
from oslo_concurrency import processutils
from oslo_log import log as logging
import s3fuse.privsep
LOG = logging.getLogger(__name__)
@s3fuse.privsep.sys_admin_pctxt.entrypoint
def mount(fstype, device, mountpoint, options, timeout=0):
if isinstance(timeout, int) and timeout > 0:
mount_cmd = ['timeout', '-sKILL', timeout, 'mount']
else:
mount_cmd = ['mount']
if fstype:
mount_cmd.extend(['-t', fstype])
if isinstance(options, tuple) and len(options) > 0:
mount_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
mount_cmd.extend(options)
mount_cmd.extend([device, mountpoint])
return processutils.execute(*mount_cmd)
@s3fuse.privsep.sys_admin_pctxt.entrypoint
def umount(mountpoint, options, timeout=0):
if isinstance(timeout, int) and timeout > 0:
umount_cmd = ['timeout', '-sKILL', timeout, 'umount']
else:
umount_cmd = ['umount']
if isinstance(options, tuple) and len(options) > 0:
umount_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
umount_cmd.extend(options)
umount_cmd.extend([mountpoint])
return processutils.execute(*umount_cmd)
@s3fuse.privsep.sys_admin_pctxt.entrypoint
def chown(path, options = []):
chown_cmd = ['chown']
if isinstance(options, tuple) and len(options) > 0:
chown_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
chown_cmd.extend(options)
chown_cmd.extend([path])
return processutils.execute(*chown_cmd)
@s3fuse.privsep.sys_admin_pctxt.entrypoint
def mkdir(dir_path, options = []):
mkdir_cmd = ['mkdir']
if isinstance(options, tuple) and len(options) > 0:
mkdir_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
mkdir_cmd.extend(options)
mkdir_cmd.extend([dir_path])
return processutils.execute(*mkdir_cmd)
@s3fuse.privsep.sys_admin_pctxt.entrypoint
def copy(source, target, options = []):
cp_cmd = ['cp']
if isinstance(options, tuple) and len(options) > 0:
cp_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
cp_cmd.extend(options)
cp_cmd.extend([source, target])
return processutils.execute(*cp_cmd)
@s3fuse.privsep.sys_admin_pctxt.entrypoint
def remove(path, options = []):
rm_cmd = ['rm -rf']
if isinstance(options, tuple) and len(options) > 0:
rm_cmd.extend(list(options))
elif isinstance(options, list) and len(options) > 0:
rm_cmd.extend(options)
rm_cmd.extend([path])
return processutils.execute(*rm_cmd)