Repository URL to install this package:
|
Version:
4.1.94-4.1 ▾
|
python3-workloadmgrclient-el8
/
usr
/
lib
/
python3.6
/
site-packages
/
workloadmgrclient
/
v1
/
workload.py
|
|---|
import os
import yaml
from cliff import show, lister
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from workloadmgrclient import timezone
from workloadmgrclient import utils
from workloadmgrclient.openstack.common import strutils
from workloadmgrclient.v1 import WorkloadmgrCommand
reassign_map_path = os.path.abspath(
(
os.path.join(
os.path.dirname(__file__), "../input-files/workload_reassign_map_file.yaml"
)
)
)
class WorkloadCommand(WorkloadmgrCommand):
resource = "workloads"
def _produce_verbose_output(self, metadata, jobschedule):
if "topology" in metadata:
metadata.pop("topology")
if "workloadgraph" in metadata:
metadata.pop("workloadgraph")
utils.print_dict(metadata, "Metadata")
utils.print_dict(jobschedule, "Jobschedule")
class ListWorkload(WorkloadCommand, lister.Lister):
"""List all the workloads of current project."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--all",
type=strutils.bool_from_string,
metavar="{True,False}",
help="List all workloads of all the projects (valid for admin user only)",
default=False,
)
parser.add_argument(
"--nfsshare",
metavar="<nfsshare>",
help="List all workloads of nfsshare (valid for admin user only)",
default=None,
)
def take_action(self, parsed_args):
client = self.get_client()
search_opts = {
"all_workloads": parsed_args.all,
"nfs_share": parsed_args.nfsshare,
}
workload_objs = client.list(search_opts=search_opts) or []
return (
["ID", "Name", "ProjectID", "WorkloadTypeID", "Status", "CreatedAt"],
(
osc_utils.get_item_properties(
obj,
[
"id",
"name",
"project_id",
"workload_type_id",
"status",
"created_at",
],
)
for obj in workload_objs
),
)
class ShowWorkload(WorkloadCommand, show.ShowOne):
"""Show details about a workload."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"workload_id",
metavar="<workload_id>",
help="ID of the workload."
)
parser.add_argument(
"--scheduler_trust",
type=bool,
metavar="<scheduler_trust {true}>",
help="scheduler_trust."
)
def take_action(self, parsed_args):
client = self.get_client()
scheduler_trust = False
# checks for scheduler_trust key and pass it to find_resource if present.
if parsed_args.scheduler_trust:
scheduler_trust = {'scheduler_trust': parsed_args.scheduler_trust}
workload = utils.find_resource(client, parsed_args.workload_id, scheduler_trust)
else:
workload = utils.find_resource(client, parsed_args.workload_id)
info = {}
info.update(workload._info)
info.pop("links", None)
metadata = info.pop("metadata", {})
job_schedule = info.pop("jobschedule", {})
info["jobschedule"] = job_schedule["enabled"]
if "timezone" in job_schedule:
date_time = utils.get_local_time(
job_schedule["start_date"] + " " + job_schedule["start_time"],
"%m/%d/%Y %I:%M %p",
"%m/%d/%Y %I:%M %p",
timezone.get_localzone().zone,
job_schedule["appliance_timezone"],
).split(" ")
job_schedule["start_date"] = date_time[0]
job_schedule["start_time"] = date_time[1] + " " + date_time[2]
job_schedule["timezone"] = timezone.get_localzone().zone
for i, val in enumerate(info["instances"]):
info["instances"][i].pop("metadata", None)
# TODO: check for better approach here
if getattr(
self.app.options,
"verbose",
getattr(self.app.options, "verbose_level", None),
):
self._produce_verbose_output(metadata, job_schedule)
columns = list(info.keys())
data = osc_utils.get_dict_properties(info, columns)
return columns, data
class CreateWorkload(WorkloadCommand, lister.Lister):
"""Creates a workload."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--instance",
metavar="<instance-id=instance-uuid>",
action="append",
dest="instances",
required=True,
help="Required to set atleast one instance,"
" Specify an instance to include in the workload."
" Specify option multiple times to include multiple instances. "
"instance-id: include the instance with this UUID ",
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional workload name. (Default=None)",
default=None,
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional workload description. (Default=None)",
default=None,
)
parser.add_argument(
"--workload-type-id",
metavar="<workload-type-id>",
help="Optional Workload Type ID, (Default=Serial)",
default=None,
)
parser.add_argument(
"--source-platform",
metavar="<source-platform>",
help="Optional workload source platform (Default=None)",
default=None,
)
parser.add_argument(
"--jobschedule",
metavar="<key=key-name>",
action="append",
dest="jobschedule",
default=[],
help="Specify following key value pairs for jobschedule "
"Specify option multiple times to include multiple keys. "
"If don't specify timezone, then by default it takes your local machine timezone"
" 'start_date' : '06/05/2014' "
" 'end_date' : '07/15/2014' "
" 'start_time' : '2:30 PM' "
" 'interval' : '1 hr' "
" 'retention_policy_type' : "
"'Number of Snapshots to Keep' or 'Number of days to retain Snapshots' "
" 'retention_policy_value' : '30' "
" 'timezone: '' , "
"For example --jobschedule start_date='mm/dd/yy' --jobschedule enabled=True"
"In order to enable/disable scheduler pass enabled True / enabled False",
)
parser.add_argument(
"--metadata",
metavar="<key=key-name>",
action="append",
dest="metadata",
default=[],
help="Specify a key value pairs to include in the workload_type metadata "
"Specify option multiple times to include multiple keys. "
"key=value",
)
parser.add_argument(
"--policy-id", metavar="<policy_id>", help="Policy ID", default=None
)
def take_action(self, parsed_args):
client = self.get_client()
instances = []
for instance_str in parsed_args.instances:
err_msg = (
"Invalid instance argument '%s'. Instance arguments must be of the "
"form --instance <instance-id=instance-uuid>" % instance_str
)
instance_info = {"instance-id": ""}
for kv_str in instance_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
raise exceptions.CommandError(err_msg)
if k in instance_info:
instance_info[k] = v
else:
raise exceptions.CommandError(err_msg)
if not instance_info["instance-id"]:
raise exceptions.CommandError(err_msg)
instances.append(instance_info)
job_schedule = {}
for jobschedule_str in parsed_args.jobschedule:
err_msg = (
"Invalid jobschedule argument '%s'. jobschedule arguments must be of the "
"form --jobschedule <key=value>" % jobschedule_str
)
for kv_str in jobschedule_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
raise exceptions.CommandError(err_msg)
if k in job_schedule:
try:
import pytz
pytz.timezone(v)
except Exception:
raise exceptions.CommandError("Specify valid timeone " + v)
job_schedule[k] = v
else:
job_schedule.setdefault(k, v)
if len(job_schedule) >= 1 and "enabled" not in job_schedule:
raise exceptions.CommandError(
"Please specify --jobschedule enabled option in order to set scheduler for this workload"
)
else:
if "timezone" in job_schedule:
job_schedule[k] = v
else:
job_schedule.setdefault("timezone", timezone.get_localzone().zone)
metadata = {}
for metadata_str in parsed_args.metadata:
err_msg = (
"Invalid metadata argument '%s'. metadata arguments must be of the "
"form --metadata <key=value>" % metadata_str
)
for kv_str in metadata_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
raise exceptions.CommandError(err_msg)
if k in metadata:
metadata[k] = v
else:
metadata.setdefault(k, v)
if parsed_args.policy_id is not None:
metadata["policy_id"] = parsed_args.policy_id
"""
workloadids = []
err_msg = ("Composite workload type must specify workload-ids ")
workload_type = _find_workload_type(cs, args.workload_type_id)
if workload_type.name == 'Composite':
if len(args.workloadids) == 0:
raise exceptions.CommandError(err_msg)
workload_info = []
composite = {}
workload_data = []
for ids in args.workloadids:
workload = _find_workload(cs, ids)
d = {}
d["text"] = str(workload.name)
d["value"] = str(workload.id)
workload_info.append(d)
inner_list = []
d = {}
d["type"] = "workload"
info = dict()
info.update(workload._info)
info['jobschedule']['enabled'] = str(info['jobschedule']['enabled'])
info['interval'] = str(info['interval'])
d['data'] = ast.literal_eval(json.dumps(info))
inner_list.append(d)
dt = {}
dt["flow"] = "serial"
dt['children'] = inner_list
workload_data.append(dt)
d = {}
d["flow"] = "serial"
d["children"] = workload_data
composite['compworkloads'] = json.dumps(workload_info)
composite['workloadgraph'] = json.dumps(d)
if workload_type.name == 'Composite':
metadata = composite
"""
workload_obj = client.create(
parsed_args.display_name,
parsed_args.display_description,
parsed_args.workload_type_id,
parsed_args.source_platform,
instances,
job_schedule,
metadata,
)
if workload_obj:
return (
["ID", "Name", "Status"],
(
osc_utils.get_item_properties(obj, ["id", "name", "status"])
for obj in [workload_obj]
),
)
return
class ModifyWorkload(WorkloadCommand):
"""Modify a workload."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"workload_id", metavar="<workload_id>", help="ID of the workload."
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional workload name. (Default=None)",
default=None,
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional workload description. (Default=None)",
default=None,
)
parser.add_argument(
"--instance",
metavar="<instance-id=instance-uuid>",
action="append",
dest="instances",
default=[],
help="Specify an instance to include in the workload. "
"Specify option multiple times to include multiple instances. "
"instance-id: include the instance with this UUID ",
)
parser.add_argument(
"--jobschedule",
metavar="<key=key-name>",
action="append",
dest="jobschedule",
default=[],
help="Specify following key value pairs for jobschedule "
"Specify option multiple times to include multiple keys. "
"If don't specify timezone, then by default it takes your local machine timezone"
" 'start_date' : '06/05/2014' "
" 'end_date' : '07/15/2014' "
" 'start_time' : '2:30 PM' "
" 'interval' : '1 hr' "
" 'retention_policy_type' : "
"'Number of Snapshots to Keep' or 'Number of days to retain Snapshots' "
" 'retention_policy_value' : '30' ",
)
parser.add_argument(
"--metadata",
metavar="<key=key-name>",
action="append",
dest="metadata",
default=[],
help="Specify a key value pairs to include in the workload_type metadata "
"Specify option multiple times to include multiple keys. "
"key=value",
)
parser.add_argument(
"--policy-id", metavar="<policy_id>", help="Policy ID", default=None
)
def take_action(self, parsed_args):
client = self.get_client()
workload_obj = utils.find_resource(client, parsed_args.workload_id)
instances = []
for instance_str in parsed_args.instances:
err_msg = (
"Invalid instance argument '%s'. Instance arguments must be of the "
"form --instance <instance-id=instance-uuid>" % instance_str
)
instance_info = {"instance-id": ""}
for kv_str in instance_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
raise exceptions.CommandError(err_msg)
if k in instance_info:
instance_info[k] = v
else:
raise exceptions.CommandError(err_msg)
if not instance_info["instance-id"]:
raise exceptions.CommandError(err_msg)
instances.append(instance_info)
job_schedule = {}
for jobschedule_str in parsed_args.jobschedule:
err_msg = (
"Invalid jobschedule argument '%s'. jobschedule arguments must be of the "
"form --jobschedule <key=value>" % jobschedule_str
)
for kv_str in jobschedule_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
raise exceptions.CommandError(err_msg)
if k in job_schedule:
try:
import pytz
pytz.timezone(v)
except Exception:
raise exceptions.CommandError("Specify valid timeone " + v)
job_schedule[k] = v
else:
job_schedule.setdefault(k, v)
if len(job_schedule) >= 1 and "enabled" not in job_schedule:
raise exceptions.CommandError(
"Please specify --jobschedule enabled option in order to set scheduler for this workload"
)
else:
if "timezone" in job_schedule:
job_schedule[k] = v
else:
job_schedule.setdefault("timezone", timezone.get_localzone().zone)
metadata = {}
for metadata_str in parsed_args.metadata:
err_msg = (
"Invalid metadata argument '%s'. metadata arguments must be of the "
"form --metadata <key=value>" % metadata_str
)
for kv_str in metadata_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
raise exceptions.CommandError(err_msg)
if k in metadata:
metadata[k] = v
else:
metadata.setdefault(k, v)
if parsed_args.policy_id is not None:
metadata["policy_id"] = parsed_args.policy_id
workload_obj.update(
workload_obj.id,
parsed_args.display_name,
parsed_args.display_description,
instances,
job_schedule,
metadata,
)
return
class DeleteWorkload(WorkloadCommand):
"""Remove a workload."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--database_only",
metavar="<True/False>",
help="Keep True if want to delete from database only.(Default=False)",
default=False,
)
parser.add_argument(
"workload_id", metavar="<workload_id>", help="ID of the workload to delete."
)
def take_action(self, parsed_args):
client = self.get_client()
workload = utils.find_resource(client, parsed_args.workload_id)
workload.delete(parsed_args.database_only)
return
class ResetWorkload(WorkloadCommand):
"""reset a workload."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"workload_id", metavar="<workload_id>", help="ID of the workload to reset."
)
def take_action(self, parsed_args):
client = self.get_client()
workload_obj = utils.find_resource(client, parsed_args.workload_id)
workload_obj.reset()
class UnlockWorkload(WorkloadCommand):
"""unlock a workload."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"workload_id", metavar="<workload_id>", help="ID of the workload to reset."
)
def take_action(self, parsed_args):
client = self.get_client()
workload_obj = utils.find_resource(client, parsed_args.workload_id)
workload_obj.unlock()
class WorkloadAuditlog(WorkloadCommand, lister.Lister):
"""Get auditlog of workload manager"""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--time_in_minutes",
metavar="<time_in_minutes>",
default=1440,
help="time in minutes(default is 24 hrs.)",
)
parser.add_argument(
"--time_from",
metavar="<time_from>",
help="From date time in format 'MM-DD-YYYY'",
default=None,
)
parser.add_argument(
"--time_to",
metavar="<time_to>",
help="To date time in format 'MM-DD-YYYY'(defult is current day)",
default=None,
)
def take_action(self, parsed_args):
client = self.get_client()
audit_logs = client.get_auditlog(
parsed_args.time_in_minutes, parsed_args.time_from, parsed_args.time_to
)
utils.print_list(
audit_logs["auditlog"],
["UserName", "ObjectName", "Timestamp", "UserId", "Details"],
)
# TODO: check why following block is not printing expected output
"""return (['UserName', 'ObjectName', 'Timestamp', 'UserId', 'Details'],
(osc_utils.get_dict_properties(
obj,
['UserName', 'ObjectName', 'Timestamp', 'UserId', 'Details']
) for obj in audit_logs['auditlog']),)
"""
class ImportWorkloadList(WorkloadCommand, lister.Lister):
"""Get list of workloads to be imported."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--project_id",
metavar="<project_id>",
default=None,
help="List workloads belongs to given project only.",
)
def take_action(self, parsed_args):
client = self.get_client()
workload_objs = client.get_importworkloads_list(
project_id=parsed_args.project_id
)
return (
["ID", "Name", "Workload_Type_ID", "Project_ID"],
(
osc_utils.get_dict_properties(
obj, ["id", "name", "workload_type_id", "project_id"]
)
for obj in workload_objs
),
)
class WorkloadNodes(WorkloadCommand, lister.Lister):
"""Get all the nodes of a workload manager"""
def take_action(self, parsed_args):
client = self.get_client()
result = client.get_nodes()
return (
["Node", "ID", "Version", "IPAddress", "IsController", "Status", "IsVIP"],
(
osc_utils.get_dict_properties(
obj,
[
"node",
"id",
"version",
"ipaddress",
"is_controller",
"status",
"is_vip",
],
)
for obj in result["nodes"]
),
)
class ListOrphanedWorkload(WorkloadCommand, lister.Lister):
"""List all the orphaned workloads having tenant_id or user_id which doesn't belong to current cloud."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--migrate_cloud",
metavar="{True,False}",
type=strutils.bool_from_string,
default=False,
help="Set to True if want to list workloads from other clouds as well."
" Default if False",
)
parser.add_argument(
"--generate_yaml",
metavar="{True,False}",
type=strutils.bool_from_string,
default=False,
help="Set to True if want to generate output file in yaml format,"
" which would be further used as input for workload reassign API.",
)
def take_action(self, parsed_args):
client = self.get_client()
orphaned_workloads = client.get_orphaned_workloads_list(parsed_args)
if parsed_args.generate_yaml:
print(
"\nPlease find map file at " + str(os.getcwd()) + "/reassign_map.yaml\n"
)
return (
["Name", "ID", "Project ID", "User ID"],
(
osc_utils.get_dict_properties(
obj, ["name", "id", "project_id", "user_id"]
)
for obj in orphaned_workloads
),
)
class StorageUsageWorkload(WorkloadCommand, show.ShowOne):
"""Get total storage used by workload manager"""
def take_action(self, parsed_args):
client = self.get_client()
storage_usage_info = client.get_storage_usage()
for storage in storage_usage_info["storage_usage"]:
for key, val in storage.items():
if type(val).__name__ == "int" or type(val).__name__ == "float":
val = str(val) + " Bytes or Approx ( " + utils.bytes_fmt(val) + " )"
storage[key] = str(val)
utils.print_dict(storage, wrap=100)
for key, val in storage_usage_info["count_dict"].items():
if type(val).__name__ == "int" or type(val).__name__ == "float":
val = str(val) + " Bytes or Approx ( " + utils.bytes_fmt(val) + " )"
storage_usage_info["count_dict"][key] = str(val)
columns = list(storage_usage_info["count_dict"].keys())
data = osc_utils.get_dict_properties(storage_usage_info["count_dict"], columns)
return columns, data
class SnapshotWorkload(WorkloadCommand):
"""Snapshot a workload."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"workload_id",
metavar="<workload_id>",
help="ID of the workload to snapshot.",
)
parser.add_argument(
"--full",
dest="full",
action="store_true",
default=False,
help="Specify if a full snapshot is required.",
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional snapshot name. (Default=None)",
default=None,
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional snapshot description. (Default=None)",
default=None,
)
def take_action(self, parsed_args):
client = self.get_client()
workload_obj = utils.find_resource(client, parsed_args.workload_id)
workload_obj.snapshot(
parsed_args.full, parsed_args.display_name, parsed_args.display_description
)
return
class RemoveNode(WorkloadCommand):
"""Remove workload node by ip-address / hostname"""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"ip", metavar="<ip>", help="IP or hostname of node to remove"
)
def take_action(self, parsed_args):
client = self.get_client()
client.remove_node(parsed_args.ip)
return
class ImportWorkloads(WorkloadCommand, lister.Lister):
"""Import all workload records from backup store."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--workloadids",
metavar="<workloadid>",
action="append",
dest="workloadids",
default=[],
help="Specify workload ids to import only specified workloads"
" --workloadids <workloadid> --workloadids <workloadid>",
)
def take_action(self, parsed_args):
client = self.get_client()
result = client.importworkloads(parsed_args.workloadids)
if len(result["failed_workloads"]):
message = (
"\nPlease verify failed workload id's are valid.\n"
+ "If workload project not exist in current cloud then use workload_reassign.\n"
)
utils.print_data_vertically([result["failed_workloads"]], ["Failed_Workloads"])
print(message)
if result['imported_workloads']:
print("Following workloads are imported successfully")
return (
["Name", "ID", "ProjectID", "UserID"],
(
osc_utils.get_dict_properties(obj, ["name", "id", "project_id", "user_id"])
for obj in result['imported_workloads']
),
)
class ReassignWorkloads(WorkloadCommand, lister.Lister):
"""Assign workload to a new tenant/user."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--old_tenant_ids",
metavar="<old_tenant_id>",
action="append",
dest="old_tenant_ids",
default=[],
help="Specify old tenant ids from which workloads need to reassign to new tenant."
" --old_tenant_ids <old_tenant_id> --old_tenant_ids <old_tenant_id>",
)
parser.add_argument(
"--new_tenant_id",
metavar="<new_tenant_id>",
default=None,
help="Specify new tenant id to which workloads need to reassign from old tenant."
" --new_tenant_id <new_tenant_id>",
)
parser.add_argument(
"--workload_ids",
metavar="<workload_id>",
action="append",
dest="workload_ids",
default=[],
help="Specify workload_ids which need to reassign to new tenant. "
"If not provided then all the workloads from old tenant "
"will get reassigned to new tenant. "
"--workload_ids <workload_id> --workload_ids <workload_id>",
)
parser.add_argument(
"--user_id",
metavar="<user_id>",
default=None,
help="Specify user id to which"
" workloads need to reassign from old tenant. "
"--user_id <user_id>",
)
parser.add_argument(
"--migrate_cloud",
metavar="{True,False}",
type=strutils.bool_from_string,
default=False,
help="Set to True if want to reassign workloads from other clouds as well. Default if False",
)
parser.add_argument(
"--map_file",
metavar="<map_file>",
default=None,
type=open,
help="Provide file path(relative or absolute) including file name of reassign map file."
" Provide list of old workloads mapped to new tenants. "
"Format for this file is YAML. For sample, please refer "
"to this file: %s ." % reassign_map_path,
)
def take_action(self, parsed_args):
client = self.get_client()
if parsed_args.map_file is not None:
data = yaml.load(read_file(parsed_args.map_file), Loader=yaml.SafeLoader)
if not isinstance(data, dict) or data.get("reassign_mappings", None):
message = (
"File content is not in required yaml format, "
+ "Please provide require data in appropriate format."
)
raise exceptions.CommandError(message)
parsed_args.map_file = data
result = client.reassign_workloads(parsed_args)
if len(result["failed_workloads"]):
message = "\nPlease verify failed workload id's are valid.\n"
utils.print_data_vertically([result["failed_workloads"]], ["Failed_Workloads"])
print(message)
return (
["Name", "ID", "ProjectID", "UserID"],
(
osc_utils.get_dict_properties(
obj, ["name", "id", "project_id", "user_id"]
)
for obj in result["reassigned_workloads"]
),
)
class AddSettings(WorkloadCommand, show.ShowOne):
"""Add workload settings"""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--setting",
metavar="<key=key-name>",
action="append",
dest="settings",
required=True,
help="Required, Specify a key value pairs to include in the settings "
"Specify option multiple times to include multiple settings. "
"key=value",
)
def take_action(self, parsed_args):
client = self.get_client()
settings = {}
for settings_str in parsed_args.settings:
err_msg = (
"Invalid settings argument '%s'. settings arguments must be of the "
"form --setting <key=value>" % settings_str
)
for kv_str in settings_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
raise exceptions.CommandError(err_msg)
if k in settings:
settings[k] = v
else:
settings.setdefault(k, v)
settings = client.settings(settings)
columns = list(settings.keys())
data = osc_utils.get_dict_properties(settings, columns)
return columns, data
class WorkloadEnableScheduler(WorkloadCommand):
"""enables workloads' scheduler"""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--workloadids",
metavar="<workloadid>",
action="append",
dest="workloadids",
required=True,
help="Required atleast one workloadid ,"
"Specify an ID of the workload whose scheduler enables. "
"Specify option multiple times to include multiple workloads."
" --workloadids <workloadid> --workloadids <workloadid>",
)
def take_action(self, parsed_args):
client = self.get_client()
search_opts = {"workload_list": parsed_args.workloadids}
workload_objs = client.list(search_opts=search_opts) or []
resumed_list = []
for workload_obj in workload_objs:
try:
workload_obj.update(
workload_id=workload_obj.id,
name=None,
description=None,
instances=None,
jobschedule={'enabled': '1'},
metadata=None,
)
resumed_list.append(workload_obj.id)
except Exception as ex:
raise ex
if len(resumed_list) < len(parsed_args.workloadids):
msg = (
"Only following workloads' schedulers are resumed: %s \n."
"Remaining provided are either not present or invalid"
% str(resumed_list)
)
raise exceptions.CommandError(msg)
class WorkloadDisableScheduler(WorkloadCommand):
"""disables workloads' scheduler"""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--workloadids",
metavar="<workloadid>",
action="append",
dest="workloadids",
required=True,
help="Required atleast one workloadid ,"
"Specify an ID of the workload whose scheduler disables. "
"Specify option multiple times to include multiple workloads."
" --workloadids <workloadid> --workloadids <workloadid>",
)
def take_action(self, parsed_args):
client = self.get_client()
search_opts = {"workload_list": parsed_args.workloadids}
workload_objs = client.list(search_opts=search_opts) or []
paused_list = []
for workload_obj in workload_objs:
try:
workload_obj.pause()
paused_list.append(workload_obj.id)
except Exception as ex:
raise ex
if len(paused_list) < len(parsed_args.workloadids):
msg = (
"Only following workloads' schedulers are paused: %s \n."
"Remaining provided are not present or invalid" % str(paused_list)
)
raise exceptions.CommandError(msg)
class ProtectedVMs(WorkloadCommand):
"""Lists vms protected by tenant."""
def take_action(self, parsed_args):
client = self.get_client()
vms = client.get_protected_vms()
utils.print_list(vms["protected_vms"], ["ID"])
return
class TenantUsage(WorkloadCommand):
"""Returns storage used and vms protected by tenants."""
def take_action(self, parsed_args):
client = self.get_client()
usage = client.get_tenants_usage()
utils.print_dict(
usage["global_usage"], dict_property="Global Usage", dict_value="Values"
)
res = {
usage.pop("tenant_name", project_id): usage
for project_id, usage in usage["tenants_usage"].items()
}
utils.print_dict(res, dict_property="Project Name", dict_value="Usage")
return