Repository URL to install this package:
Version:
5.2.8 ▾
|
python3-workloadmgrclient
/
usr
/
lib
/
python3
/
dist-packages
/
workloadmgrclient
/
v1
/
snapshot.py
|
---|
import ast
import os
import six
import yaml
from cliff import show, lister
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from osc_lib.cli import format_columns
from workloadmgrclient import timezone
from workloadmgrclient import utils
from workloadmgrclient.openstack.common import strutils
from workloadmgrclient.v1 import WorkloadmgrCommand
restore_json_path = os.path.abspath(
(os.path.join(os.path.dirname(__file__), "../input-files/restore.json"))
)
class SnapshotCommand(WorkloadmgrCommand):
resource = "snapshots"
class ListSnapshot(SnapshotCommand, lister.Lister):
"""List all the workloads."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--workload_id",
metavar="<workload_id>",
default=None,
help="Filter results by workload_id",
)
parser.add_argument(
"--tvault_node",
metavar="<host>",
help="List all the snapshot operations scheduled on a tvault node(Default=None)",
default=None,
)
parser.add_argument(
"--date_from",
metavar="<date_from>",
help="From date in format 'YYYY-MM-DDTHH:MM:SS' eg 2016-10-10T00:00:00,"
"If don't specify time then it takes 00:00 by default",
default=None,
)
parser.add_argument(
"--date_to",
metavar="<date_to>",
help="To date in format 'YYYY-MM-DDTHH:MM:SS'(defult is current day),"
"Specify HH:MM:SS to get snapshots within same day inclusive/exclusive results"
"for date_from and date_to",
default=None,
)
parser.add_argument(
"--all",
type=strutils.bool_from_string,
metavar="{True,False}",
help="List all snapshots of all the projects(valid for admin user only)",
default=False,
)
def take_action(self, parsed_args):
client = self.get_client()
search_opts = {
"host": parsed_args.tvault_node,
"workload_id": parsed_args.workload_id,
"all": parsed_args.all,
"date_from": parsed_args.date_from,
"date_to": parsed_args.date_to,
}
snapshot_objs = client.list(search_opts=search_opts) or []
headers = [
"Created At",
"Name",
"ID",
"Workload ID",
"Snapshot Type",
"Status",
"Host",
]
columns = [
"created at",
"name",
"id",
"workload_id",
"snapshot_type",
"status",
"host",
]
return (
headers,
(osc_utils.get_item_properties(s, columns) for s in snapshot_objs),
)
class SetSnapshot(SnapshotCommand, show.ShowOne):
"""Reset Snapshot Status."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--snapshot_id",
metavar="<snapshot_id>",
default=None,
help="set status to this snapshot_id",
)
def take_action(self, parsed_args):
client = self.get_client()
search_opts = {
"snapshot_id": parsed_args.snapshot_id,
"status": "error"
}
snapshot_objs = client.set(search_opts=search_opts) or []
headers = [
"Name",
"ID",
"Snapshot ID",
"Snapshot Type",
"Status"
]
columns = [
"name",
"id",
"workload_id",
"snapshot_type",
"status"
]
data = osc_utils.get_dict_properties(snapshot_objs.get('snapshot'), columns)
return columns, data
class ShowSnapshot(SnapshotCommand, show.ShowOne):
"""Show details about a workload snapshot"""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id", metavar="<snapshot_id>", help="ID of the workload snapshot."
)
parser.add_argument(
"--output",
metavar="<output>",
default=None,
help="Option to get additional snapshot details, "
"Specify --output metadata for snapshot metadata, "
"Specify --output networks for snapshot vms networks, "
"Specify --output disks for snapshot vms disks",
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
info = dict()
instances = dict()
metadata = dict()
info.update(snapshot_obj._info)
if "links" in info:
info.pop("links")
if "instances" in info:
instances = info.pop("instances")
if "metadata" in info:
metadata = info.pop("metadata")
inst = []
networks = []
vdisks = []
for item in instances:
d = dict()
d["Name"] = str(item["name"])
if "imported_from_vcenter" in item["metadata"]:
datastores = ast.literal_eval(item["metadata"]["datastores"])
for ds in datastores:
if ds["_type"] == "Datastore":
d["Datastore"] = ds["name"]
d["Resource Pool"] = ast.literal_eval(item["metadata"]["resourcepool"])[
"name"
]
d["VM Folder"] = ast.literal_eval(item["metadata"]["parent"])["name"]
clusters = ast.literal_eval(item["metadata"]["cluster"])
for cl in clusters:
if cl["_type"] == "Cluster":
d["Cluster"] = cl["name"]
hosts = ast.literal_eval(item["metadata"]["host"])
for hs in hosts:
if hs["_type"] == "Host":
d["Host"] = hs["name"]
networks += ast.literal_eval(item["metadata"]["networks"])
vdisks += ast.literal_eval(item["metadata"]["vdisks"])
for nt in networks:
if nt["_type"] == "Network":
d["Network"] = nt["name"]
d["VM Power State"] = str(item["status"])
else:
d["Status"] = str(item["status"])
# TODO: nics is a long string... need to handle correctly
# d['NICs'] = str(item['nics'])
if "flavor" in item:
d["Flavor"] = str(item["flavor"])
if "security_group" in item:
d["Security Group"] = str(item["security_group"])
d["ID"] = str(item["id"])
for index, nic in enumerate(item["nics"]):
item["nics"][index]["vm_id"] = item["id"]
for index, disk in enumerate(item["vdisks"]):
item["vdisks"][index]["vm_id"] = item["id"]
networks += ast.literal_eval(str(item["nics"]))
vdisks += ast.literal_eval(str(item["vdisks"]))
inst.append(d)
meta = []
for item in metadata:
m = dict()
m[str(item["key"])] = str(item["value"])
meta.append(m)
info["size"] = (
str(info["size"])
+ " Bytes or Approx ("
+ utils.bytes_fmt(info["size"])
+ ")"
)
info["restore_size"] = (
str(info["restore_size"])
+ " Bytes or Approx ("
+ utils.bytes_fmt(info["restore_size"])
+ ")"
)
info["time_taken"] = str(info["time_taken"]) + " Seconds"
[
info.pop(k)
for k in [
"pinned",
"created_at",
"finished_at",
"updated_at",
"user_id",
"project_id",
]
]
if not info["warning_msg"]:
info.pop("warning_msg")
if info["status"] != "error":
info.pop("error_msg")
if info["status"] in set(["available", "error", "mounted"]):
info.pop("progress_msg")
if not info["status"] in set(["available"]):
info.pop("restore_size")
if info["status"] in set(["mounted"]):
for m in meta:
if "mounturl" in m:
info["mounturl"] = m["mounturl"]
break
utils.print_data_vertically(inst, ["Instances", "Value"])
if parsed_args.output == "networks":
utils.print_data_vertically(networks, ["Networks", "Value"])
elif parsed_args.output == "disks":
utils.print_data_vertically(vdisks, ["Vdisks", "Value"])
elif parsed_args.output == "metadata":
utils.print_data_vertically(meta, ["Metadata", "Value"])
columns = list(info.keys())
data = osc_utils.get_dict_properties(info, columns)
return columns, data
class CancelSnapshot(SnapshotCommand):
"""Cancel a snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id", metavar="<snapshot_id>", help="ID of snapshot to cancel."
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
snapshot_obj.cancel()
return
class DeleteSnapshot(SnapshotCommand):
"""Remove a workload snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of the workload snapshot to delete.",
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
snapshot_obj.delete()
return
class ListMountedSnapshot(SnapshotCommand, lister.Lister):
"""List of all mounted snapshots"""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"--workloadid",
metavar="<workloadid>",
help="Workload id (Default=None)",
default=None,
)
def take_action(self, parsed_args):
client = self.get_client()
snapshots = client.snapshot_mounted_list(parsed_args.workloadid)
if snapshots and snapshots.get("mounted_snapshots", []):
headers = [
"snapshot_id",
"snapshot_name",
"workload_id",
"mounturl",
"status"
]
columns = [
"snapshot_id",
"snapshot_name",
"workload_id",
"mounturl",
"status"
]
return (
headers,
(osc_utils.get_dict_properties(snap, columns) for snap in snapshots['mounted_snapshots']),
)
return (None, None)
class MountSnapshot(SnapshotCommand):
"""Mount a workload snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of the workload snapshot to mount.",
)
parser.add_argument(
"mount_vm_id",
metavar="<mount_vm_id>",
help="VM ID that snapshot volumes mount to.",
)
parser.add_argument(
"--options",
metavar="<options>",
help="Mount options. (Default={})",
default="{}",
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
options = ast.literal_eval(parsed_args.options)
snapshot_obj.mount(parsed_args.mount_vm_id, options)
print(
'Please run "workloadmgr snapshot-show --output metadata %s" to '
"get the snapshot status" % parsed_args.snapshot_id
)
return
class DismountSnapshot(SnapshotCommand):
"""Dismount a workload snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of the workload snapshot to dismount.",
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
snapshot_obj.dismount()
return
class InplaceRestoreSnapshot(SnapshotCommand):
"""Inplace restore of workload snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of the workload snapshot to restore.",
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional name for the restore. (Default=None)",
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional description for restore. (Default=None)",
)
parser.add_argument(
"--filename",
metavar="<filename>",
type=open,
default=restore_json_path,
help="Provide file path(relative or absolute) including file name,"
"by default it will read file: %s."
"You can use this for reference or replace values into"
" this file." % restore_json_path,
)
def take_action(self, parsed_args):
name = parsed_args.display_name
description = parsed_args.display_description
file_data = utils.read_file(parsed_args.filename)
try:
json_data = yaml.load(file_data, Loader=yaml.SafeLoader)
except Exception as ex:
raise exceptions.CommandError(
"JSON conversion failed with error: {}".format(
getattr(ex, "context", ex)
)
)
if not name:
name = json_data.get('name') if json_data.get('name') else "Inplace Restore"
if not description:
description = json_data.get('description') \
if json_data.get('description') else "Inplace Restore"
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
snapshot_obj.restore(False, name, description, json_data)
class OneclickRestoreSnapshot(SnapshotCommand):
"""Oneclick restore of workload snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of the workload snapshot to restore.",
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional name for the restore. (Default='One Click Restore')",
default="One Click Restore",
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional description for restore. (Default='One Click Restore')",
default="One Click Restore",
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
options = ast.literal_eval(
"{'openstack': {}, 'type': 'openstack', 'oneclickrestore': True,"
" 'restore_type': 'oneclick', 'vmware': {}}"
)
snapshot_obj.restore(
False, parsed_args.display_name, parsed_args.display_description, options
)
class SelectiveRestoreSnapshot(SnapshotCommand):
"""Selective restore of workload snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of the workload snapshot to restore.",
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional name for the restore. (Default=None)",
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional description for restore. (Default=None)",
)
parser.add_argument(
"--filename",
metavar="<filename>",
type=open,
default=restore_json_path,
help="Provide file path(relative or absolute) including file name,"
"by default it will read file: %s."
"You can use this for reference or replace values into"
" this file." % restore_json_path,
)
def take_action(self, parsed_args):
name = parsed_args.display_name
description = parsed_args.display_description
file_data = utils.read_file(parsed_args.filename)
try:
json_data = yaml.load(file_data, Loader=yaml.SafeLoader)
except Exception as ex:
raise exceptions.CommandError(
"JSON conversion failed with error: {}".format(
getattr(ex, "context", ex)
)
)
if not name:
name = json_data.get('name') if json_data.get('name') else "Selective Restore"
if not description:
description = json_data.get('description') \
if json_data.get('description') else "Selective Restore"
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
snapshot_obj.restore(False, name, description, json_data)
class RestoreNetworkTopology(SnapshotCommand):
"""Restores only network topology from a snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of a snapshot, which network topology needs to be restored.",
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional name for the restore.",
default="Network Topology Restore",
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional description for restore.",
default="Network Topology Restore",
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
snapshot_obj.restore_network_topology(
parsed_args.display_name, parsed_args.display_description
)
class RestoreSecurityGroups(SnapshotCommand):
"""Restores only security groups from a snapshot."""
@staticmethod
def _add_arguments(parser):
parser.add_argument(
"snapshot_id",
metavar="<snapshot_id>",
help="ID of a snapshot, which security groups needs to be restored.",
)
parser.add_argument(
"--display-name",
metavar="<display-name>",
help="Optional name for the restore.",
default="Security Groups Restore",
)
parser.add_argument(
"--display-description",
metavar="<display-description>",
help="Optional description for restore.",
default="Security Groups Restore",
)
def take_action(self, parsed_args):
client = self.get_client()
snapshot_obj = utils.find_resource(client, parsed_args.snapshot_id)
snapshot_obj.restore_security_groups(
parsed_args.display_name, parsed_args.display_description
)