Repository URL to install this package:
Version:
4.0.101 ▾
|
# Copyright (c) 2014 TrilioData, Inc.
# All Rights Reserved.
"""
Command-line interface to the OpenStack WorkloadMgr API.
"""
from __future__ import print_function
import argparse
import glob
import imp
import inspect
import itertools
import logging
import os
import pkgutil
import six
import sys
import os_client_config
from cliff import app
from cliff import _argparse
from cliff import command
from cliff import commandmanager
import workloadmgrclient.extension
from workloadmgrclient import client
from workloadmgrclient import exceptions as exc
from workloadmgrclient.common import extension as client_extension
from workloadmgrclient.common.client_manager import ClientManager
from workloadmgrclient.openstack.common import strutils
from workloadmgrclient import utils
from workloadmgrclient.v1 import shell as shell_v1
from workloadmgrclient.version import __version__
DEFAULT_OS_WORKLOAD_API_VERSION = "1"
DEFAULT_WORKLOADMGR_ENDPOINT_TYPE = "publicURL"
DEFAULT_WORKLOADMGR_SERVICE_TYPE = "workloads"
DEFAULT_API_VERSION = "1.0"
# TODO: find out what should go here
API_VERSION_OPTION = "os-workload-api-version"
API_NAME = "workloadmgr"
API_VERSIONS = {
"1.0": "workloadmgrclient.v1.client.Client",
"1": "workloadmgrclient.v1.client.Client",
}
logger = logging.getLogger(__name__)
class BashCompletionCommand(command.Command):
"""Prints all of the commands and options for bash-completion."""
def take_action(self, parsed_args):
pass
# TODO verify this version value
VERSION = "1.0"
NAMESPACE_MAP = {DEFAULT_API_VERSION: "workloadmgr.cli.v1"}
def _set_commands_dict_for_compat(apiversion, command_manager):
global COMMANDS
COMMANDS = {
apiversion: dict(
(cmd, command_manager.find_command([cmd])[0])
for cmd in command_manager.commands
)
}
class WorkloadMgrClientArgumentParser(_argparse.ArgumentParser):
def __init__(self, *args, **kwargs):
super(WorkloadMgrClientArgumentParser, self).__init__(*args, **kwargs)
def error(self, message):
"""error(message: string)
Prints a usage message incorporating the message to stderr and
exits.
"""
self.print_usage(sys.stderr)
# FIXME(lzyeval): if changes occur in argparse.ArgParser._check_value
choose_from = " (choose from"
progparts = self.prog.partition(" ")
self.exit(
2,
"error: %(errmsg)s\nTry '%(mainp)s help %(subp)s'"
" for more information.\n"
% {
"errmsg": message.split(choose_from)[0],
"mainp": progparts[0],
"subp": progparts[2],
},
)
class HelpAction(argparse.Action):
"""Print help message including sub-commands
Provide a custom action so the -h and --help options
to the main app will print a list of the commands.
The commands are determined by checking the CommandManager
instance, passed in as the "default" value for the action.
"""
def __call__(self, parser, namespace, values, option_string=None):
outputs = []
max_len = 0
app = self.default
parser.print_help(app.stdout)
app.stdout.write("\nCommands for API v{}:\n".format(app.api_version))
command_manager = app.command_manager
for name, ep in sorted(command_manager):
factory = ep.load()
cmd = factory(self, None)
one_liner = cmd.get_description().split("\n")[0]
outputs.append((name, one_liner))
max_len = max(len(name), max_len)
for (name, one_liner) in outputs:
app.stdout.write(" %s %s\n" % (name.ljust(max_len), one_liner))
sys.exit(0)
class OpenStackWorkloadMgrShell(app.App):
def __init__(self, apiversion=DEFAULT_API_VERSION):
namespace = NAMESPACE_MAP[apiversion]
description = (
__doc__.strip() + " (workload manager CLI version: %s)" % __version__
)
super(OpenStackWorkloadMgrShell, self).__init__(
description=description,
version=VERSION,
command_manager=commandmanager.CommandManager(namespace),
)
self._register_extensions(VERSION)
# Pop the 'complete' to correct the outputs of 'help'.
self.command_manager.commands.pop("complete")
# This is instantiated in initialize_app() only when using
# password flow auth
self.auth_client = None
self.api_version = apiversion
self.client_logger = None
_set_commands_dict_for_compat(apiversion, self.command_manager)
def build_option_parser(self, description, version):
parser = WorkloadMgrClientArgumentParser(
prog="workloadmgr",
description=__doc__.strip(),
epilog="See 'workloadmgr help COMMAND' for help on a specific command.",
add_help=False,
formatter_class=OpenStackHelpFormatter,
)
# Global arguments
if self.deferred_help:
parser.add_argument(
"-h",
"--help",
dest="deferred_help",
action="store_true",
help="Show help message and exit.",
)
else:
parser.add_argument(
"-h",
"--help",
action=HelpAction,
nargs=0,
default=self, # tricky
help="Show help message and exit.",
)
parser.add_argument(
"--version", action="version", version=workloadmgrclient.__version__
)
parser.add_argument(
"--debug",
action="store_true",
default=utils.env("WORKLOADMGRCLIENT_DEBUG", default=False),
help="Print debugging output",
)
parser.add_argument(
"--verbose", action="store_true", default=False, help="Print verbose output"
)
parser.add_argument(
"--os-username",
metavar="<auth-user-name>",
default=utils.env("OS_USERNAME", "WORKLOADMGR_USERNAME"),
help="Defaults to env[OS_USERNAME].",
)
parser.add_argument("--os_username", help=argparse.SUPPRESS)
parser.add_argument(
"--os-password",
metavar="<auth-password>",
default=utils.env("OS_PASSWORD", "WORKLOADMGR_PASSWORD"),
help="Defaults to env[OS_PASSWORD].",
)
parser.add_argument("--os_password", help=argparse.SUPPRESS)
parser.add_argument(
"--os-tenant-name",
metavar="<auth-tenant-name>",
default=utils.env("OS_TENANT_NAME", "WORKLOADMGR_PROJECT_ID"),
help="Defaults to env[OS_TENANT_NAME].",
)
parser.add_argument(
"--os-tenant-id",
metavar="<auth-tenant-id>",
default=utils.env("OS_TENANT_ID", "WORKLOADMGR_TENANT_ID"),
help="Defaults to env[OS_TENANT_ID].",
)
parser.add_argument(
"--os-project-name",
metavar="<auth-project-name>",
default=utils.env("OS_PROJECT_NAME", "WORKLOADMGR_PROJECT_ID"),
help="Defaults to env[OS_PROJECT_NAME].",
)
parser.add_argument(
"--os-project-id",
metavar="<auth-project-id>",
default=utils.env("OS_PROJECT_ID", "WORKLOADMGR_TENANT_ID"),
help="Defaults to env[OS_PROJECT_ID].",
)
parser.add_argument(
"--os-auth-url",
metavar="<auth-url>",
default=utils.env("OS_AUTH_URL", "WORKLOADMGR_URL"),
help="Defaults to env[OS_AUTH_URL].",
)
parser.add_argument("--os_auth_url", help=argparse.SUPPRESS)
parser.add_argument(
"--os-region-name",
metavar="<region-name>",
default=utils.env("OS_REGION_NAME", "WORKLOADMGR_REGION_NAME"),
help="Defaults to env[OS_REGION_NAME].",
)
parser.add_argument("--os_region_name", help=argparse.SUPPRESS)
parser.add_argument(
"--os-domain-id",
metavar="<domain-id>",
default=utils.env("OS_DOMAIN_ID", default="default"),
help="Defaults to env[OS_DOMAIN_ID].",
)
parser.add_argument("--os_domain_id", help=argparse.SUPPRESS)
parser.add_argument(
"--os-user-domain-name",
metavar="<user-domain-name>",
default=utils.env("OS_USER_DOMAIN_NAME", default=""),
help="Defaults to env[OS_USER_DOMAIN_NAME].",
)
parser.add_argument(
"--os-user-domain-id",
metavar="<user-domain-id>",
default=utils.env("OS_USER_DOMAIN_ID", default=""),
help="Defaults to env[OS_USER_DOMAIN_ID].",
)
parser.add_argument(
"--os-project-domain-name",
metavar="<project-domain-name>",
default=utils.env("OS_PROJECT_DOMAIN_NAME", default=""),
help="Defaults to env[OS_PROJECT_DOMAIN_NAME].",
)
parser.add_argument(
"--os-project-domain-id",
metavar="<project-domain-id>",
default=utils.env("OS_PROJECT_DOMAIN_ID", default=""),
help="Defaults to env[OS_PROJECT_DOMAIN_ID].",
)
parser.add_argument(
"--service-type",
metavar="<service-type>",
help="Defaults to compute for most actions",
)
parser.add_argument("--service_type", help=argparse.SUPPRESS)
parser.add_argument(
"--service-name",
metavar="<service-name>",
default=utils.env("WORKLOADMGR_SERVICE_NAME"),
help="Defaults to env[WORKLOADMGR_SERVICE_NAME]",
)
parser.add_argument("--service_name", help=argparse.SUPPRESS)
parser.add_argument(
"--workload-service-name",
metavar="<workload-service-name>",
default=utils.env("WORKLOADMGR_VOLUME_SERVICE_NAME"),
help="Defaults to env[WORKLOADMGR_VOLUME_SERVICE_NAME]",
)
parser.add_argument("--workload_service_name", help=argparse.SUPPRESS)
parser.add_argument(
"--endpoint-type",
metavar="<endpoint-type>",
default=utils.env(
"WORKLOADMGR_ENDPOINT_TYPE", default=DEFAULT_WORKLOADMGR_ENDPOINT_TYPE
),
help="Defaults to env[WORKLOADMGR_ENDPOINT_TYPE] or "
+ DEFAULT_WORKLOADMGR_ENDPOINT_TYPE
+ ".",
)
parser.add_argument("--endpoint_type", help=argparse.SUPPRESS)
parser.add_argument(
"--os-workload-api-version",
metavar="<workload-api-ver>",
default=utils.env(
"OS_WORKLOAD_API_VERSION", default=DEFAULT_OS_WORKLOAD_API_VERSION
),
help="Accepts 1, defaults to env[OS_WORKLOAD_API_VERSION].",
)
parser.add_argument("--os_workload_api_version", help=argparse.SUPPRESS)
parser.add_argument(
"--os-cacert",
metavar="<ca-certificate>",
default=utils.env("OS_CACERT", default=None),
help="Specify a CA bundle file to use in "
"verifying a TLS (https) server certificate. "
"Defaults to env[OS_CACERT]",
)
parser.add_argument(
"--insecure",
default=utils.env("INSECURE", default=False),
action="store_true",
help="Specify --insecure to suppress SSL"
"certificate validation"
"Defaults to env[INSECURE]",
)
parser.add_argument(
"--retries",
metavar="<retries>",
type=int,
default=0,
help="Number of retries.",
)
# FIXME(dtroyer): The args below are here for diablo compatibility,
# remove them in folsum cycle
# alias for --os-username, left in for backwards compatibility
parser.add_argument("--username", help=argparse.SUPPRESS)
# alias for --os-region_name, left in for backwards compatibility
parser.add_argument("--region_name", help=argparse.SUPPRESS)
# alias for --os-password, left in for backwards compatibility
parser.add_argument(
"--apikey",
"--password",
dest="apikey",
default=utils.env("WORKLOADMGR_API_KEY"),
help=argparse.SUPPRESS,
)
# alias for --os-auth-url, left in for backward compatibility
parser.add_argument(
"--url",
"--auth_url",
dest="url",
default=utils.env("WORKLOADMGR_URL"),
help=argparse.SUPPRESS,
)
return parser
def _register_extensions(self, version):
for name, module in itertools.chain(
client_extension.discover_via_entry_points(),
self._discover_via_python_path(version),
self._discover_via_contrib_path(version),
):
self._extend_shell_commands(name, module, version)
def _discover_via_python_path(self, version):
# TODO check if this can be consume in self.command_manager
for (module_loader, name, ispkg) in pkgutil.iter_modules():
if name.endswith("python_workloadmgrclient_ext"):
if not hasattr(module_loader, "load_module"):
# Python 2.6 compat: actually get an ImpImporter obj
module_loader = module_loader.find_module(name)
module = module_loader.load_module(name)
yield name, module
def _discover_via_contrib_path(self, version):
# TODO check if this can be consume in self.command_manager
module_path = os.path.dirname(os.path.abspath(__file__))
version_str = "v%s" % version.replace(".", "_")
ext_path = os.path.join(module_path, version_str, "contrib")
ext_glob = os.path.join(ext_path, "*.py")
for ext_path in glob.iglob(ext_glob):
name = os.path.basename(ext_path)[:-3]
if name == "__init__":
continue
module = imp.load_source(name, ext_path)
yield name, module
def _extend_shell_commands(self, name, module, version):
classes = inspect.getmembers(module, inspect.isclass)
for cls_name, cls in classes:
if issubclass(cls, client_extension.NeutronClientExtension) and hasattr(
cls, "shell_command"
):
cmd = cls.shell_command
if hasattr(cls, "versions"):
if version not in cls.versions:
continue
try:
name_prefix = "[%s]" % name
cls.__doc__ = (
"%s %s" % (name_prefix, cls.__doc__)
if cls.__doc__
else name_prefix
)
self.command_manager.add_command(cmd, cls)
except TypeError:
pass
def _find_actions(self, subparsers, actions_module):
# TODO with register extensions check these as well
for attr in (a for a in dir(actions_module) if a.startswith("do_")):
# I prefer to be hypen-separated instead of underscores.
command = attr[3:].replace("_", "-")
callback = getattr(actions_module, attr)
desc = callback.__doc__ or ""
help = desc.strip().split("\n")[0]
arguments = getattr(callback, "arguments", [])
subparser = subparsers.add_parser(
command,
help=help,
description=desc,
add_help=False,
formatter_class=OpenStackHelpFormatter,
)
subparser.add_argument(
"-h", "--help", action="help", help=argparse.SUPPRESS,
)
self.subcommands[command] = subparser
for (args, kwargs) in arguments:
subparser.add_argument(*args, **kwargs)
subparser.set_defaults(func=callback)
def setup_debugging(self, debug):
# Send higher-level messages to the console via stderr
logging.basicConfig()
stream_handler = logging.StreamHandler()
formatter = "%(levelname)s (%(module)s:%(lineno)d) %(message)s"
logger.setLevel(logging.WARNING)
logging.getLogger("iso8601.iso8601").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
stream_handler.setFormatter(logging.Formatter(formatter))
logger.addHandler(stream_handler)
if not debug:
return
logger.setLevel(logging.DEBUG)
self.client_logger = logging.getLogger(client.__name__)
ch = logging.StreamHandler()
self.client_logger.setLevel(logging.DEBUG)
self.client_logger.addHandler(ch)
def _run_extension_hooks(self, hook_type, *args, **kwargs):
# TODO check it's use and decide
"""Run hooks for all registered extensions."""
for extension in self.extensions:
extension.run_hooks(hook_type, *args, **kwargs)
def authenticate_user(self):
"""Confirm user authentication
Make sure the user has provided all of the authentication
info we need.
"""
# FIXME(usrleon): Here should be restrict for project id same as
# for os_username or os_password but for compatibility it is not.
# TODO: check for isunauthenticated condition here
# if not utils.isunauthenticated(args.func):
if not self.options.os_username:
if not self.options.username:
raise exc.CommandError(
"You must provide a username "
"via either --os-username or env[OS_USERNAME]"
)
else:
self.options.os_username = self.options.username
if not self.options.os_password:
if not self.options.apikey:
raise exc.CommandError(
"You must provide a password "
"via either --os-password or via "
"env[OS_PASSWORD]"
)
else:
self.options.os_password = self.options.apikey
if not self.options.os_auth_url:
if not self.options.url:
raise exc.CommandError(
"You must provide an auth url "
"via either --os-auth-url or env[OS_AUTH_URL]"
)
else:
self.options.os_auth_url = self.options.url
if not self.options.os_region_name and self.options.region_name:
self.options.os_region_name = self.options.region_name
if not (self.options.os_project_id or self.options.os_tenant_id):
raise exc.CommandError(
"You must provide a project_id "
"via either --os-project-id or env[OS_PROJECT_ID]"
)
if self.options.os_tenant_id and not self.options.os_project_id:
self.options.os_project_id = self.options.os_tenant_id
if not (self.options.os_project_name or self.options.os_tenant_name):
raise exc.CommandError(
"You must provide a project_name "
"via either --os-project-name or env[OS_PROJECT_NAME]"
)
if self.options.os_tenant_name and not self.options.os_project_name:
self.options.os_project_name = self.options.os_tenant_name
if not self.options.os_auth_url:
raise exc.CommandError(
"You must provide an auth url "
"via either --os-auth-url or env[OS_AUTH_URL]"
)
if not (self.options.os_user_domain_name or self.options.os_user_domain_id):
raise exc.CommandError(
"You must provide a user_domain_name or user_domain_id "
"for user domain: use --os-user-domain-name or env[OS_USER_DOMAIN_NAME] or "
"for user id: use --os-user-domain-id or env[OS_USER_DOMAIN_ID]"
)
if not (self.options.os_project_domain_name or self.options.os_project_domain_id):
raise exc.CommandError(
"You must provide a project_domain_name or project_domain_id "
"for project domain: use --os-project-domain-name or env[OS_PROJECT_DOMAIN_NAME] or "
"for project id: use --os-project-domain-id or env[OS_PROJECT_DOMAIN_ID]"
)
self.client_manager = ClientManager(
username=self.options.os_username,
password=self.options.os_password,
project_id=self.options.os_project_name,
auth_url=self.options.os_auth_url,
insecure=self.options.insecure,
tenant_id=self.options.os_project_id,
region_name=self.options.os_region_name,
endpoint_type=self.options.endpoint_type, # or self.endpoint_type,
service_type=self.options.service_type,
service_name=self.options.service_name,
ca_cert=self.options.os_cacert,
retries=self.options.retries,
os_domain_id=self.options.os_domain_id,
os_user_domain_id=self.options.os_user_domain_id or None,
os_project_domain_id=self.options.os_project_domain_id or None,
user_domain_name=self.options.os_user_domain_name or None,
project_domain_name=self.options.os_project_domain_name or None,
logger=self.client_logger,
# timeout=self.options.http_timeout,
)
self.client_manager.initialize()
return
def initialize_app(self, argv):
"""Global app init bits:
* set up API versions
* validate authentication info
"""
super(OpenStackWorkloadMgrShell, self).initialize_app(argv)
# If the user is not asking for help, make sure they
# have given us auth.
cmd_name = None
if argv:
cmd_info = self.command_manager.find_command(argv)
cmd_factory, cmd_name, sub_argv = cmd_info
if self.interactive_mode or cmd_name != "help":
self.authenticate_user()
def run(self, argv):
"""Equivalent to the main program for the application.
:param argv: input arguments and options
:paramtype argv: list of str
"""
try:
index = 0
command_pos = -1
help_pos = -1
help_command_pos = -1
for arg in argv:
"""if arg == 'bash-completion' and help_command_pos == -1:
self._bash_completion()
return 0"""
if arg in ("-h", "--help"):
if help_pos == -1:
help_pos = index
# self.command_manager.commands contains 'help',
# so we need to check this first.
elif arg == "help":
if help_command_pos == -1:
help_command_pos = index
elif arg in self.command_manager.commands:
if command_pos == -1:
command_pos = index
index = index + 1
if command_pos > -1 and help_pos > command_pos:
argv = ["help", argv[command_pos]]
if help_command_pos > -1 and command_pos == -1:
argv[help_command_pos] = "--help"
self.options, remainder = self.parser.parse_known_args(argv)
if any(self.options.insecure == i for i in ["True", "TRUE", "true"]):
self.options.insecure = True
elif any(self.options.insecure == i for i in ["False", "FALSE", "false"]):
self.options.insecure = False
if not self.options.endpoint_type:
self.options.endpoint_type = DEFAULT_WORKLOADMGR_ENDPOINT_TYPE
if not self.options.service_type:
# TODO: check how to pass service_type
self.options.service_type = DEFAULT_WORKLOADMGR_SERVICE_TYPE
self.setup_debugging(self.options.debug)
self.interactive_mode = not remainder
self.initialize_app(remainder)
except Exception as err:
raise err
if self.interactive_mode:
_argv = [sys.argv[0]]
sys.argv = _argv
return self.interact()
return self.run_subcommand(remainder)
# I'm picky about my shell help.
class OpenStackHelpFormatter(argparse.HelpFormatter):
def start_section(self, heading):
# Title-case the headings
heading = "%s%s" % (heading[0].upper(), heading[1:])
super(OpenStackHelpFormatter, self).start_section(heading)
def main():
try:
OpenStackWorkloadMgrShell().run(list(map(strutils.safe_decode, sys.argv[1:])))
except KeyboardInterrupt:
print("... terminating workloadmgr client", file=sys.stderr)
sys.exit(130)
except Exception as e:
logger.debug(e, exc_info=1)
message = str(e)
if not isinstance(message, six.string_types):
message = str(message)
print(
"ERROR: {}".format(strutils.safe_encode(message).decode()), file=sys.stderr
)
sys.exit(1)
if __name__ == "__main__":
main()