Repository URL to install this package:
Version:
5.2.8 ▾
|
python3-workloadmgrclient
/
usr
/
lib
/
python3
/
dist-packages
/
workloadmgrclient
/
v1
/
policy.py
|
---|
import six
from cliff import show, lister
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from workloadmgrclient import utils
from workloadmgrclient.v1 import WorkloadmgrCommand
from workloadmgrclient.v1.validators import validate_int_value
validate_fields = {
"interval": {"lower_bound": 1, "upper_bound": None},
"retention_policy_value": {"lower_bound": 1, "upper_bound": 365},
# '-1' for 'NEVER' and '0' for 'ALWAYS'
"fullbackup_interval": {"lower_bound": -1, "upper_bound": 999},
}
class PolicyCommand(WorkloadmgrCommand):
resource = "workload_policy"
@staticmethod
def produce_policy_output(objects=None):
if objects:
utils.print_list(objects, ["ID", "Name", "Description", "Status"])
# print assigned project if only one policy to display
if len(objects) == 1 and objects[0].policy_assignments:
utils.print_list(
objects[0].policy_assignments, ["project_id", "project_name"]
)
return
class ListPolicy(PolicyCommand, lister.Lister):
"""List all available policies."""
def take_action(self, parsed_args):
client = self.get_client()
policy_objs = client.list() or []
headers = ["ID", "Name", "Description", "Status"]
columns = ["id", "name", "description", "status"]
return (
headers,
(osc_utils.get_item_properties(s, columns) for s in policy_objs),
)
class ShowPolicy(PolicyCommand, show.ShowOne):
"""Show a policy."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"policy_id", metavar="<policy_id>", help="ID of the policy."
)
def action_return(self, policy_obj):
if policy_obj:
data = {}
for field_value in policy_obj.field_values:
data[field_value["policy_field_name"]] = field_value["value"]
# TODO check for after project assignment
self.produce_policy_output([policy_obj])
return zip(*sorted(six.iteritems(data)))
def take_action(self, parsed_args):
client = self.get_client()
policy_obj = client.get(parsed_args.policy_id)
return self.action_return(policy_obj)
class CreatePolicy(ShowPolicy):
"""Creates a policy."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"display_name", metavar="<display_name>", help="policy name."
)
parser.add_argument(
"--policy-fields",
metavar="<key=key-name>",
action="append",
dest="policy_fields",
required=True,
default=[],
help="Specify following key value pairs for policy fields "
"Specify option multiple times to include multiple keys. "
" 'interval' : '1 hr' (must be positive number) "
" 'retention_policy_type' : 'Number of Snapshots to Keep' or 'Number of days to retain Snapshots'"
" 'retention_policy_value' : '30' (must be between 1 to 365) "
" 'fullbackup_interval' : '-1'"
"(Enter Number of incremental snapshots to take Full Backup between 1 to 999,"
" '-1' for 'NEVER' and '0' for 'ALWAYS')"
"For example --policy-fields retention_policy_type='Number of Snapshots to Keep'"
"--policy-fields interval='1 hr'"
"--policy-fields retention_policy_value='30'"
"--policy-fields fullbackup_interval='2'",
)
parser.add_argument(
"--display-description",
metavar="<display_description>",
help="Optional policy description. (Default=No description)",
default="No description",
)
parser.add_argument(
"--metadata",
metavar="<key=key-name>",
action="append",
dest="metadata",
default=[],
help="Specify a key value pairs to include in the policy create metadata "
"Specify option multiple times to include multiple keys. "
"key=value",
)
def take_action(self, parsed_args):
client = self.get_client()
try:
policy_fields = {}
err_msg = None
for policy_field_str in parsed_args.policy_fields:
for kv_str in policy_field_str.split(","):
try:
k, v = kv_str.split("=", 1)
if k in validate_fields:
value = v if "" not in v else v.split(" ")[0]
valid_value = validate_int_value(
value, validate_fields[k]["lower_bound"],
validate_fields[k]["upper_bound"]
)
if not valid_value:
err_msg = (
"Invalid value provided for policy_field '{}'"
" check help for more information"
)
raise exceptions.CommandError(err_msg.format(k))
except ValueError:
if not err_msg:
err_msg = (
"Invalid policy_field argument '{}'. "
"policy_field arguments must be of the "
"form --policy_field <key=value>"
)
raise exceptions.CommandError(err_msg.format(policy_field_str))
if k in policy_fields:
policy_fields[k] = v
else:
policy_fields.setdefault(k, v)
metadata = {}
for metadata_str in parsed_args.metadata:
for kv_str in metadata_str.split(","):
try:
k, v = kv_str.split("=", 1)
except ValueError:
err_msg = (
"Invalid metadata argument '%s'. metadata arguments must be of the "
"form --metadata <key=value>" % metadata_str
)
raise exceptions.CommandError(err_msg)
if k in metadata:
metadata[k] = v
else:
metadata.setdefault(k, v)
policy_obj = self._perform_operation(
client, parsed_args, policy_fields, metadata
)
return self.action_return(policy_obj)
except Exception as ex:
raise exceptions.CommandError(str(ex))
def _perform_operation(self, client, args, policy_fields, metadata):
return client.create(
args.display_name, args.display_description, policy_fields, metadata
)
class UpdatePolicy(CreatePolicy):
"""Update a policy."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"policy_id", metavar="<policy_id>", help="ID of the policy."
)
parser.add_argument(
"--display-name", metavar="<display-name>", help="policy name."
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional policy description.",
)
parser.add_argument(
"--policy-fields",
metavar="<key=key-name>",
action="append",
dest="policy_fields",
default=[],
help="Specify following key value pairs for policy fields "
"Specify option multiple times to include multiple keys. "
" 'interval' : '1 hr' (must be positive number) "
" 'retention_policy_type' : 'Number of Snapshots to Keep' or 'Number of days to retain Snapshots'"
" 'retention_policy_value' : '30' (must be between 1 to 365) "
" 'fullbackup_interval' : '-1' "
"(Enter Number of incremental snapshots to take Full Backup between 1 to 999,"
" '-1' for 'NEVER' and '0' for 'ALWAYS') "
"For example --policy-fields retention_policy_type='Number of Snapshots to Keep'"
"--policy-fields interval='1 hr'"
"--policy-fields retention_policy_value='30'"
"--policy-fields fullbackup_interval='2'",
)
parser.add_argument(
"--metadata",
metavar="<key=key-name>",
action="append",
dest="metadata",
default=[],
help="Specify a key value pairs to include in the policy update metadata "
"Specify option multiple times to include multiple keys. "
"key=value",
)
def _perform_operation(self, client, args, policy_fields, metadata):
return client.update(
args.policy_id,
args.display_name,
args.display_description,
policy_fields,
metadata,
)
class DeletePolicy(PolicyCommand):
"""Remove a policy."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"policy_id", metavar="<policy_id>", help="ID of the policy."
)
def take_action(self, parsed_args):
client = self.get_client()
client.delete(parsed_args.policy_id)
class AssignPolicy(ShowPolicy):
"""Assign/Remove policy to given projects."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"policy_id", metavar="<policy_id>", help="ID of the policy."
)
parser.add_argument(
"--add_project",
metavar="<project_id>",
action="append",
dest="add_project",
default=[],
help="ID of the projects to assign policy. "
"--add_project <project_id> --add_project <project_id>",
)
parser.add_argument(
"--remove_project",
metavar="<project_id>",
action="append",
dest="remove_project",
default=[],
help="ID of the projects to remove policy. "
"--remove_project <project_id> --remove_project <project_id>",
)
def take_action(self, parsed_args):
client = self.get_client()
result = client.assign(
parsed_args.policy_id, parsed_args.add_project, parsed_args.remove_project
)
if len(result["failed_ids"]) > 0:
msg = "Please verify failed project id's are valid"
utils.print_data_vertically([result["failed_ids"]], ["Failed_projects"])
raise exceptions.CommandError(msg)
else:
return self.action_return(result["policy"])
class ListAssignedPolicy(PolicyCommand, lister.Lister):
"""List assigned policies on given project."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"project_id",
metavar="<project_id>",
help="ID of the project to list assigned policies.",
)
def take_action(self, parsed_args):
client = self.get_client()
policy_objs = client.get_assigned_policies(parsed_args.project_id) or []
headers = ["ID", "Name", "Deleted", "CreatedAt"]
columns = ["policy_id", "policy_name", "deleted", "created_at"]
return (
headers,
(osc_utils.get_dict_properties(s, columns) for s in policy_objs),
)