Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import six
from cliff import show, lister
from osc_lib import exceptions
from osc_lib import utils as osc_utils

from workloadmgrclient import utils
from workloadmgrclient.v1 import WorkloadmgrCommand


class PolicyCommand(WorkloadmgrCommand):
    resource = "workload_policy"

    @staticmethod
    def produce_policy_output(objects=None):
        if objects:
            utils.print_list(objects, ["ID", "Name", "Description", "Status"])
            # print assigned project if only one policy to display
            if len(objects) == 1 and objects[0].policy_assignments:
                utils.print_list(
                    objects[0].policy_assignments, ["project_id", "project_name"]
                )
        return


class ListPolicy(PolicyCommand, lister.Lister):
    """List all available policies."""

    def take_action(self, parsed_args):
        client = self.get_client()
        policy_objs = client.list() or []
        headers = ["ID", "Name", "Description", "Status"]
        columns = ["id", "name", "description", "status"]
        return (
            headers,
            (osc_utils.get_item_properties(s, columns) for s in policy_objs),
        )


class ShowPolicy(PolicyCommand, show.ShowOne):
    """Show a policy."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "policy_id", metavar="<policy_id>", help="ID of the policy."
        )

    def action_return(self, policy_obj):
        if policy_obj:
            data = {}
            for field_value in policy_obj.field_values:
                data[field_value["policy_field_name"]] = field_value["value"]
            # TODO check for after project assignment
            self.produce_policy_output([policy_obj])
            return zip(*sorted(six.iteritems(data)))

    def take_action(self, parsed_args):
        client = self.get_client()
        policy_obj = client.get(parsed_args.policy_id)
        return self.action_return(policy_obj)


class CreatePolicy(ShowPolicy):
    """Creates a policy."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "display_name", metavar="<display_name>", help="policy name."
        )
        parser.add_argument(
            "--policy-fields",
            metavar="<key=key-name>",
            action="append",
            dest="policy_fields",
            required=True,
            default=[],
            help="Specify following key value pairs for policy fields "
            "Specify option multiple times to include multiple keys. "
            " 'interval' : '1 hr' "
            " 'retention_policy_type' : 'Number of Snapshots to Keep' or 'Number of days to retain Snapshots'"
            " 'retention_policy_value' : '30' "
            " 'fullbackup_interval' : '-1'"
            "(Enter Number of incremental snapshots to take Full Backup between 1 to 999,"
            " '-1' for 'NEVER' and '0' for 'ALWAYS')"
            "For example --policy-fields retention_policy_type='Number of Snapshots to Keep'"
            "--policy-fields interval='1 hr'"
            "--policy-fields retention_policy_value='30'"
            "--policy-fields fullbackup_interval='2'",
        )
        parser.add_argument(
            "--display-description",
            metavar="<display_description>",
            help="Optional policy description. (Default=No description)",
            default="No description",
        )
        parser.add_argument(
            "--metadata",
            metavar="<key=key-name>",
            action="append",
            dest="metadata",
            default=[],
            help="Specify a key value pairs to include in the workload_type metadata "
            "Specify option multiple times to include multiple keys. "
            "key=value",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        try:
            policy_fields = {}
            for policy_field_str in parsed_args.policy_fields:
                err_msg = (
                    "Invalid policy_field argument '%s'. policy_field arguments must be of the "
                    "form --policy_field <key=value>" % policy_field_str
                )

                for kv_str in policy_field_str.split(","):
                    try:
                        k, v = kv_str.split("=", 1)
                    except ValueError:
                        raise exceptions.CommandError(err_msg)

                    if k in policy_fields:
                        policy_fields[k] = v
                    else:
                        policy_fields.setdefault(k, v)

            metadata = {}
            for metadata_str in parsed_args.metadata:
                err_msg = (
                    "Invalid metadata argument '%s'. metadata arguments must be of the "
                    "form --metadata <key=value>" % metadata_str
                )

                for kv_str in metadata_str.split(","):
                    try:
                        k, v = kv_str.split("=", 1)
                    except ValueError:
                        raise exceptions.CommandError(err_msg)

                    if k in metadata:
                        metadata[k] = v
                    else:
                        metadata.setdefault(k, v)

            policy_obj = self._perform_operation(
                client, parsed_args, policy_fields, metadata
            )
            return self.action_return(policy_obj)
        except Exception as ex:
            raise exceptions.CommandError(str(ex))

    def _perform_operation(self, client, args, policy_fields, metadata):
        return client.create(
            args.display_name, args.display_description, policy_fields, metadata
        )


class UpdatePolicy(CreatePolicy):
    """Update a policy."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "policy_id", metavar="<policy_id>", help="ID of the policy."
        )
        parser.add_argument(
            "--display-name", metavar="<display-name>", help="policy name."
        )
        parser.add_argument(
            "--display-description",
            metavar="<display-description>",
            help="Optional policy description.",
        )
        parser.add_argument(
            "--policy-fields",
            metavar="<key=key-name>",
            action="append",
            dest="policy_fields",
            default=[],
            help="Specify following key value pairs for policy fields "
            "Specify option multiple times to include multiple keys. "
            " 'interval' : '1 hr' "
            " 'retention_policy_type' : 'Number of Snapshots to Keep' or 'Number of days to retain Snapshots'"
            " 'retention_policy_value' : '30' "
            " 'fullbackup_interval' : '-1' "
            "(Enter Number of incremental snapshots to take Full Backup between 1 to 999,"
            " '-1' for 'NEVER' and '0' for 'ALWAYS') "
            "For example --policy-fields retention_policy_type='Number of Snapshots to Keep'"
            "--policy-fields interval='1 hr'"
            "--policy-fields retention_policy_value='30'"
            "--policy-fields fullbackup_interval='2'",
        )
        parser.add_argument(
            "--metadata",
            metavar="<key=key-name>",
            action="append",
            dest="metadata",
            default=[],
            help="Specify a key value pairs to include in the workload_type metadata "
            "Specify option multiple times to include multiple keys. "
            "key=value",
        )

    def _perform_operation(self, client, args, policy_fields, metadata):
        return client.update(
            args.policy_id,
            args.display_name,
            args.display_description,
            policy_fields,
            metadata,
        )


class DeletePolicy(PolicyCommand):
    """Remove a policy."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "policy_id", metavar="<policy_id>", help="ID of the policy."
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        client.delete(parsed_args.policy_id)


class AssignPolicy(ShowPolicy):
    """Assign/Remove policy to given projects."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "policy_id", metavar="<policy_id>", help="ID of the policy."
        )
        parser.add_argument(
            "--add_project",
            metavar="<project_id>",
            action="append",
            dest="add_project",
            default=[],
            help="ID of the projects to assign policy. "
            "--add_project <project_id> --add_project <project_id>",
        )
        parser.add_argument(
            "--remove_project",
            metavar="<project_id>",
            action="append",
            dest="remove_project",
            default=[],
            help="ID of the projects to remove policy. "
            "--remove_project <project_id> --remove_project <project_id>",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        result = client.assign(
            parsed_args.policy_id, parsed_args.add_project, parsed_args.remove_project
        )

        if len(result["failed_ids"]) > 0:
            msg = "Please verify failed project id's are valid"
            utils.print_data_vertically([result["failed_ids"]], ["Failed_projects"])
            raise exceptions.CommandError(msg)
        else:
            return self.action_return(result["policy"])


class ListAssignedPolicy(PolicyCommand, lister.Lister):
    """List assigned policies on given project."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "project_id",
            metavar="<project_id>",
            help="ID of the project to list assigned policies.",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        policy_objs = client.get_assigned_policies(parsed_args.project_id) or []
        headers = ["ID", "Name", "Deleted", "CreatedAt"]
        columns = ["policy_id", "policy_name", "deleted", "created_at"]
        return (
            headers,
            (osc_utils.get_dict_properties(s, columns) for s in policy_objs),
        )