Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""
Helpers for filesystem related routines.
"""

from oslo_concurrency import processutils
from oslo_log import log as logging

import s3fuse.privsep


LOG = logging.getLogger(__name__)


@s3fuse.privsep.sys_admin_pctxt.entrypoint
def mount(fstype, device, mountpoint, options, timeout=0):
    if isinstance(timeout, int) and timeout > 0:
        mount_cmd = ['timeout', '-sKILL', timeout, 'mount']
    else:
        mount_cmd = ['mount']
    if fstype:
        mount_cmd.extend(['-t', fstype])
    if isinstance(options, tuple) and len(options) > 0:
        mount_cmd.extend(list(options))
    elif isinstance(options, list) and len(options) > 0:
        mount_cmd.extend(options)
    mount_cmd.extend([device, mountpoint])
    return processutils.execute(*mount_cmd)

@s3fuse.privsep.sys_admin_pctxt.entrypoint
def umount(mountpoint, options, timeout=0):
    if isinstance(timeout, int) and timeout > 0:
        umount_cmd = ['timeout', '-sKILL', timeout, 'umount']
    else:
        umount_cmd = ['umount']
    if isinstance(options, tuple) and len(options) > 0:
        umount_cmd.extend(list(options))
    elif isinstance(options, list) and len(options) > 0:
        umount_cmd.extend(options)
    umount_cmd.extend([mountpoint])
    return processutils.execute(*umount_cmd)

@s3fuse.privsep.sys_admin_pctxt.entrypoint
def chown(path, options = []):
    chown_cmd = ['chown']
    if isinstance(options, tuple) and len(options) > 0:
        chown_cmd.extend(list(options))
    elif isinstance(options, list) and len(options) > 0:
        chown_cmd.extend(options)
    chown_cmd.extend([path])
    return processutils.execute(*chown_cmd)

@s3fuse.privsep.sys_admin_pctxt.entrypoint
def mkdir(dir_path, options = []):
    mkdir_cmd = ['mkdir']
    if isinstance(options, tuple) and len(options) > 0:
        mkdir_cmd.extend(list(options))
    elif isinstance(options, list) and len(options) > 0:
        mkdir_cmd.extend(options)
    mkdir_cmd.extend([dir_path])
    return processutils.execute(*mkdir_cmd)

@s3fuse.privsep.sys_admin_pctxt.entrypoint
def copy(source, target, options = []):
    cp_cmd = ['cp']
    if isinstance(options, tuple) and len(options) > 0:
        cp_cmd.extend(list(options))
    elif isinstance(options, list) and len(options) > 0:
        cp_cmd.extend(options)
    cp_cmd.extend([source, target])
    return processutils.execute(*cp_cmd)

@s3fuse.privsep.sys_admin_pctxt.entrypoint
def remove(path, options = []):
    rm_cmd = ['rm -rf']
    if isinstance(options, tuple) and len(options) > 0:
        rm_cmd.extend(list(options))
    elif isinstance(options, list) and len(options) > 0:
        rm_cmd.extend(options)
    rm_cmd.extend([path])
    return processutils.execute(*rm_cmd)