Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import ast
import os
import six
import yaml
from cliff import show, lister
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from osc_lib.cli import format_columns

from workloadmgrclient import timezone
from workloadmgrclient import utils
from workloadmgrclient.openstack.common import strutils
from workloadmgrclient.v1 import WorkloadmgrCommand


restore_json_path = os.path.abspath(
    (os.path.join(os.path.dirname(__file__), "../input-files/restore.json"))
)


class SnapshotCommand(WorkloadmgrCommand):
    resource = "snapshots"


class ListSnapshot(SnapshotCommand, lister.Lister):
    """List all the workloads."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "--workload_id",
            metavar="<workload_id>",
            default=None,
            help="Filter results by workload_id",
        )
        parser.add_argument(
            "--tvault_node",
            metavar="<host>",
            help="List all the snapshot operations scheduled on a tvault node(Default=None)",
            default=None,
        )
        parser.add_argument(
            "--date_from",
            metavar="<date_from>",
            help="From date in format 'YYYY-MM-DDTHH:MM:SS' eg 2016-10-10T00:00:00,"
            "If don't specify time then it takes 00:00 by default",
            default=None,
        )
        parser.add_argument(
            "--date_to",
            metavar="<date_to>",
            help="To date in format 'YYYY-MM-DDTHH:MM:SS'(defult is current day),"
            "Specify HH:MM:SS to get snapshots within same day inclusive/exclusive results"
            "for date_from and date_to",
            default=None,
        )
        parser.add_argument(
            "--all",
            type=strutils.bool_from_string,
            metavar="{True,False}",
            help="List all snapshots of all the projects(valid for admin user only)",
            default=False,
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        search_opts = {
            "host": parsed_args.tvault_node,
            "workload_id": parsed_args.workload_id,
            "all": parsed_args.all,
            "date_from": parsed_args.date_from,
            "date_to": parsed_args.date_to,
        }
        snapshot_objs = client.list(search_opts=search_opts) or []
        headers = [
            "Created At",
            "Name",
            "ID",
            "Workload ID",
            "Snapshot Type",
            "Status",
            "Host",
        ]
        columns = [
            "created at",
            "name",
            "id",
            "workload_id",
            "snapshot_type",
            "status",
            "host",
        ]
        return (
            headers,
            (osc_utils.get_item_properties(s, columns) for s in snapshot_objs),
        )


class SetSnapshot(SnapshotCommand, show.ShowOne):
    """Reset Snapshot Status."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "--snapshot_id",
            metavar="<snapshot_id>",
            default=None,
            help="set status to this snapshot_id",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        search_opts = {
            "snapshot_id": parsed_args.snapshot_id,
            "status": "error"
        }
        snapshot_objs = client.set(search_opts=search_opts) or []
        headers = [
            "Name",
            "ID",
            "Snapshot ID",
            "Snapshot Type",
            "Status"
        ]
        columns = [
            "name",
            "id",
            "workload_id",
            "snapshot_type",
            "status"
        ]
        data = osc_utils.get_dict_properties(snapshot_objs.get('snapshot'), columns)
        return columns, data


class ShowSnapshot(SnapshotCommand, show.ShowOne):
    """Show details about a workload snapshot"""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id", metavar="<snapshot_id>", help="ID of the workload snapshot."
        )
        parser.add_argument(
            "--output",
            metavar="<output>",
            default=None,
            help="Option to get additional snapshot details, "
            "Specify --output metadata for snapshot metadata, "
            "Specify --output networks for snapshot vms networks, "
            "Specify --output disks for snapshot vms disks",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        info = dict()
        instances = dict()
        metadata = dict()

        info.update(snapshot_obj._info)
        if "links" in info:
            info.pop("links")

        if "instances" in info:
            instances = info.pop("instances")

        if "metadata" in info:
            metadata = info.pop("metadata")

        inst = []
        networks = []
        vdisks = []
        for item in instances:
            d = dict()
            d["Name"] = str(item["name"])
            if "imported_from_vcenter" in item["metadata"]:
                datastores = ast.literal_eval(item["metadata"]["datastores"])
                for ds in datastores:
                    if ds["_type"] == "Datastore":
                        d["Datastore"] = ds["name"]
                d["Resource Pool"] = ast.literal_eval(item["metadata"]["resourcepool"])[
                    "name"
                ]
                d["VM Folder"] = ast.literal_eval(item["metadata"]["parent"])["name"]
                clusters = ast.literal_eval(item["metadata"]["cluster"])
                for cl in clusters:
                    if cl["_type"] == "Cluster":
                        d["Cluster"] = cl["name"]
                hosts = ast.literal_eval(item["metadata"]["host"])
                for hs in hosts:
                    if hs["_type"] == "Host":
                        d["Host"] = hs["name"]
                networks += ast.literal_eval(item["metadata"]["networks"])
                vdisks += ast.literal_eval(item["metadata"]["vdisks"])
                for nt in networks:
                    if nt["_type"] == "Network":
                        d["Network"] = nt["name"]
                d["VM Power State"] = str(item["status"])
            else:
                d["Status"] = str(item["status"])
                # TODO: nics is a long string... need to handle correctly
                # d['NICs'] = str(item['nics'])
                if "flavor" in item:
                    d["Flavor"] = str(item["flavor"])
                if "security_group" in item:
                    d["Security Group"] = str(item["security_group"])
                d["ID"] = str(item["id"])

                for index, nic in enumerate(item["nics"]):
                    item["nics"][index]["vm_id"] = item["id"]
                for index, disk in enumerate(item["vdisks"]):
                    item["vdisks"][index]["vm_id"] = item["id"]

                networks += ast.literal_eval(str(item["nics"]))
                vdisks += ast.literal_eval(str(item["vdisks"]))

            inst.append(d)

        meta = []
        for item in metadata:
            m = dict()
            m[str(item["key"])] = str(item["value"])
            meta.append(m)

        info["size"] = (
            str(info["size"])
            + " Bytes or Approx ("
            + utils.bytes_fmt(info["size"])
            + ")"
        )

        info["restore_size"] = (
            str(info["restore_size"])
            + " Bytes or Approx ("
            + utils.bytes_fmt(info["restore_size"])
            + ")"
        )

        info["time_taken"] = str(info["time_taken"]) + " Seconds"

        [
            info.pop(k)
            for k in [
                "pinned",
                "created_at",
                "finished_at",
                "updated_at",
                "user_id",
                "project_id",
            ]
        ]
        if not info["warning_msg"]:
            info.pop("warning_msg")

        if info["status"] != "error":
            info.pop("error_msg")

        if info["status"] in set(["available", "error", "mounted"]):
            info.pop("progress_msg")

        if not info["status"] in set(["available"]):
            info.pop("restore_size")

        if info["status"] in set(["mounted"]):
            for m in meta:
                if "mounturl" in m:
                    info["mounturl"] = m["mounturl"]
                    break

        utils.print_data_vertically(inst, ["Instances", "Value"])
        if parsed_args.output == "networks":
            utils.print_data_vertically(networks, ["Networks", "Value"])
        elif parsed_args.output == "disks":
            utils.print_data_vertically(vdisks, ["Vdisks", "Value"])
        elif parsed_args.output == "metadata":
            utils.print_data_vertically(meta, ["Metadata", "Value"])

        columns = list(info.keys())
        data = osc_utils.get_dict_properties(info, columns)
        return columns, data


class CancelSnapshot(SnapshotCommand):
    """Cancel a snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id", metavar="<snapshot_id>", help="ID of snapshot to cancel."
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        snapshot_obj.cancel()
        return


class DeleteSnapshot(SnapshotCommand):
    """Remove a workload snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of the workload snapshot to delete.",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        snapshot_obj.delete()
        return


class ListMountedSnapshot(SnapshotCommand, lister.Lister):
    """List of all mounted snapshots"""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "--workloadid",
            metavar="<workloadid>",
            help="Workload id (Default=None)",
            default=None,
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshots = client.snapshot_mounted_list(parsed_args.workloadid)
        if snapshots and snapshots.get("mounted_snapshots", []):
            headers = [
                        "snapshot_id",
                        "snapshot_name",
                        "workload_id",
                        "mounturl",
                        "status"
                        ]

            columns = [
                        "snapshot_id",
                        "snapshot_name",
                        "workload_id",
                        "mounturl",
                        "status"
                        ]
            return (
                headers,
                (osc_utils.get_dict_properties(snap, columns) for snap in snapshots['mounted_snapshots']),
            )
        return (None, None)


class MountSnapshot(SnapshotCommand):
    """Mount a workload snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of the workload snapshot to mount.",
        )
        parser.add_argument(
            "mount_vm_id",
            metavar="<mount_vm_id>",
            help="VM ID that snapshot volumes mount to.",
        )
        parser.add_argument(
            "--options",
            metavar="<options>",
            help="Mount options. (Default={})",
            default="{}",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        options = ast.literal_eval(parsed_args.options)
        snapshot_obj.mount(parsed_args.mount_vm_id, options)
        print(
            'Please run "workloadmgr snapshot-show --output metadata %s" to '
            "get the snapshot status" % parsed_args.snapshot_id
        )
        return


class DismountSnapshot(SnapshotCommand):
    """Dismount a workload snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of the workload snapshot to dismount.",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        snapshot_obj.dismount()
        return


class InplaceRestoreSnapshot(SnapshotCommand):
    """Inplace restore of workload snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of the workload snapshot to restore.",
        )
        parser.add_argument(
            "--display-name",
            metavar="<display-name>",
            help="Optional name for the restore. (Default=None)",
        )
        parser.add_argument(
            "--display-description",
            metavar="<display-description>",
            help="Optional description for restore. (Default=None)",
        )
        parser.add_argument(
            "--filename",
            metavar="<filename>",
            type=open,
            default=restore_json_path,
            help="Provide file path(relative or absolute) including file name,"
            "by default it will read file: %s."
            "You can use this for reference or replace values into"
            " this file." % restore_json_path,
        )

    def take_action(self, parsed_args):
        name = parsed_args.display_name
        description = parsed_args.display_description
        file_data = utils.read_file(parsed_args.filename)
        try:
            json_data = yaml.load(file_data, Loader=yaml.SafeLoader)
        except Exception as ex:
            raise exceptions.CommandError(
                "JSON conversion failed with error: {}".format(
                    getattr(ex, "context", ex)
                )
            )

        if not name:
            name = json_data.get('name') if json_data.get('name') else "Inplace Restore"
        if not description:
            description = json_data.get('description') \
                if json_data.get('description') else "Inplace Restore"

        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        snapshot_obj.restore(False, name, description, json_data)


class OneclickRestoreSnapshot(SnapshotCommand):
    """Oneclick restore of workload snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of the workload snapshot to restore.",
        )
        parser.add_argument(
            "--display-name",
            metavar="<display-name>",
            help="Optional name for the restore. (Default='One Click Restore')",
            default="One Click Restore",
        )
        parser.add_argument(
            "--display-description",
            metavar="<display-description>",
            help="Optional description for restore. (Default='One Click Restore')",
            default="One Click Restore",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        options = ast.literal_eval(
            "{'openstack': {}, 'type': 'openstack', 'oneclickrestore': True,"
            " 'restore_type': 'oneclick', 'vmware': {}}"
        )
        snapshot_obj.restore(
            False, parsed_args.display_name, parsed_args.display_description, options
        )


class SelectiveRestoreSnapshot(SnapshotCommand):
    """Selective restore of workload snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of the workload snapshot to restore.",
        )
        parser.add_argument(
            "--display-name",
            metavar="<display-name>",
            help="Optional name for the restore. (Default=None)",
        )
        parser.add_argument(
            "--display-description",
            metavar="<display-description>",
            help="Optional description for restore. (Default=None)",
        )
        parser.add_argument(
            "--filename",
            metavar="<filename>",
            type=open,
            default=restore_json_path,
            help="Provide file path(relative or absolute) including file name,"
            "by default it will read file: %s."
            "You can use this for reference or replace values into"
            " this file." % restore_json_path,
        )

    def take_action(self, parsed_args):
        name = parsed_args.display_name
        description = parsed_args.display_description
        file_data = utils.read_file(parsed_args.filename)
        try:
            json_data = yaml.load(file_data, Loader=yaml.SafeLoader)
        except Exception as ex:
            raise exceptions.CommandError(
                "JSON conversion failed with error: {}".format(
                    getattr(ex, "context", ex)
                )
            )

        if not name:
            name = json_data.get('name') if json_data.get('name') else "Selective Restore"
        if not description:
            description = json_data.get('description') \
                if json_data.get('description') else "Selective Restore"

        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        snapshot_obj.restore(False, name, description, json_data)


class RestoreNetworkTopology(SnapshotCommand):
    """Restores only network topology from a snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of a snapshot, which network topology needs to be restored.",
        )
        parser.add_argument(
            "--display-name",
            metavar="<display-name>",
            help="Optional name for the restore.",
            default="Network Topology Restore",
        )
        parser.add_argument(
            "--display-description",
            metavar="<display-description>",
            help="Optional description for restore.",
            default="Network Topology Restore",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        snapshot_obj.restore_network_topology(
            parsed_args.display_name, parsed_args.display_description
        )


class RestoreSecurityGroups(SnapshotCommand):
    """Restores only security groups from a snapshot."""

    @staticmethod
    def _add_arguments(parser):
        parser.add_argument(
            "snapshot_id",
            metavar="<snapshot_id>",
            help="ID of a snapshot, which security groups needs to be restored.",
        )
        parser.add_argument(
            "--display-name",
            metavar="<display-name>",
            help="Optional name for the restore.",
            default="Security Groups Restore",
        )
        parser.add_argument(
            "--display-description",
            metavar="<display-description>",
            help="Optional description for restore.",
            default="Security Groups Restore",
        )

    def take_action(self, parsed_args):
        client = self.get_client()
        snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
        snapshot_obj.restore_security_groups(
            parsed_args.display_name, parsed_args.display_description
        )