Repository URL to install this package:
|
Version:
4.1.94.1.dev5 ▾
|
import attr
from workloadmgr.common import context as wlm_context
from workloadmgr.common.constants import quota_type_id_map, quota_type_name_map
def get_snapshot_count(context):
# TODO: add support to calculate actual value
return 1
def get_vm_count(context):
# TODO: add support to calculate actual value
return 1
def get_volume_count(context):
# TODO: add support to calculate actual value
return 1
def get_storage(context):
# TODO: add support to calculate actual value
return 1
quota_type_desc_map = {
"Workloads": "Total number of workload creation allowed per project",
"Snapshots": "Total number of snapshot creation allowed per project",
"VMs": "Total number of VMs allowed per project",
"Volumes": "Total number of volume attachments allowed per project",
"Storage": "Total storage (in Bytes) allowed per project",
}
@attr.s
class ProjectQuotaTypes:
id = attr.ib(type=str)
display_name = attr.ib(type=str)
display_description = attr.ib(type=str, default=None)
@display_name.validator
def _validate_display_name(self, attribute, value):
if value not in quota_type_desc_map:
raise ValueError(
"display_name must one of following: {}".format(
list(quota_type_desc_map.keys())
)
)
def to_dict(self):
return {
"id": self.id,
"display_name": self.display_name,
"display_description": self.display_description,
}
@staticmethod
def deserialize(obj):
if obj:
return {
"id": obj.id,
"display_name": obj.display_name,
"display_description": obj.display_description
}
return {}
@attr.s
class AllowedQuota:
""" Validator class for allowed quota data """
project_id = attr.ib(type=str)
quota_type_id = attr.ib(type=str)
allowed_value = attr.ib(type=int, default=-1)
actual_value = attr.ib(type=int, default=0)
high_watermark = attr.ib(type=int)
@high_watermark.default
def _set_default_based_on_allowed_value(self):
if self.allowed_value > 0:
return int((self.allowed_value / 100) * 80)
return -1
@allowed_value.validator
def _validate_allowed_value(self, attribute, value):
if value and int(value) != -1:
if int(value) < -1:
raise ValueError(
"allowed_value must be whole number"
)
elif self.actual_value and (
int(value) < int(self.actual_value)
):
raise ValueError(
"actual_value must be smaller or equal to allowed_value"
)
@high_watermark.validator
def _validate_high_watermark(self, attribute, value):
if value and int(value) != -1:
if int(value) < -1:
raise ValueError(
"high_watermark must be whole number"
)
else:
if int(value) > int(self.allowed_value):
raise ValueError(
"high_watermark must be smaller than allowed_value"
)
if not self.allowed_value:
raise ValueError(
"high_watermark is valid only if allowed_value is defined"
)
def to_dict(self, avoid_none=False):
if not avoid_none:
return {
"project_id": self.project_id,
"quota_type_id": self.quota_type_id,
"allowed_value": self.allowed_value,
"high_watermark": self.high_watermark
}
else:
data = {
"project_id": self.project_id,
"quota_type_id": self.quota_type_id
}
filter_keys = ["allowed_value", "actual_value", "high_watermark"]
for key in filter_keys:
if self.get(key, None):
data[key] = self.get(key)
return data
@staticmethod
def deserialize(obj):
if obj:
return {
"id": obj.id,
"project_id": obj.project_id,
"quota_type_id": obj.quota_type_id,
"allowed_value": obj.allowed_value,
"high_watermark": obj.high_watermark,
"version": obj.version,
"quota_type_name": quota_type_name_map.get(
obj.quota_type_id, ''
)
}
return {}
def workload_quota_check(project_id, db, import_count=1):
admin_context = wlm_context.get_admin_context()
workload_count = db.get_workload_count(admin_context, project_id)
if import_count and import_count > 0:
workload_count += import_count
allowed_quota = db.get_allowed_quotas(
admin_context, project_id, allowed_quota_id=None,
quota_type_id=quota_type_id_map["Workloads"]
)
if len(allowed_quota):
try:
data = AllowedQuota.deserialize(allowed_quota[0])
data.update({"actual_value": workload_count})
data.pop("quota_type_name", None)
data.pop("version", None)
data.pop("id", None)
return AllowedQuota(**data)
except Exception as err:
if import_count:
raise Exception(
"Can't perform workload creation/updation because project allowed quota will exceed"
)
raise err
return None
def storage_quota_check(project_id, db, total_storage, next_storage=None):
if next_storage:
total_storage += next_storage
admin_context = wlm_context.get_admin_context()
allowed_quota = db.get_allowed_quotas(
admin_context, project_id, allowed_quota_id=None,
quota_type_id=quota_type_id_map["Storage"]
)
if len(allowed_quota):
try:
data = AllowedQuota.deserialize(allowed_quota[0])
data.update({"actual_value": total_storage})
data.pop("quota_type_name", None)
data.pop("version", None)
data.pop("id", None)
return AllowedQuota(**data)
except Exception as err:
if next_storage:
raise ValueError(
"Can't perform snapshot because project storage allowed quota will exceed"
)
raise err
return None
def vms_quota_check(project_id, db, bulk_count=1):
admin_context = wlm_context.get_admin_context()
snapshot_count = db.get_vms_count_by_project_id(admin_context, project_id)
if bulk_count and bulk_count > 0:
snapshot_count += bulk_count
allowed_quota = db.get_allowed_quotas(
admin_context, project_id, allowed_quota_id=None,
quota_type_id=quota_type_id_map["VMs"]
)
if len(allowed_quota):
try:
data = AllowedQuota.deserialize(allowed_quota[0])
data.update({"actual_value": snapshot_count})
data.pop("quota_type_name", None)
data.pop("version", None)
data.pop("id", None)
return AllowedQuota(**data)
except Exception as err:
if bulk_count:
raise Exception(
"Can't perform workload creation because project's VM allowed quota will exceed"
)
raise err
return None
def snapshot_quota_check(project_id, db, import_count=1):
admin_context = wlm_context.get_admin_context()
snapshot_count = db.get_active_snapshot_count(admin_context, project_id)
if import_count and import_count > 0:
snapshot_count += import_count
allowed_quota = db.get_allowed_quotas(
admin_context, project_id, allowed_quota_id=None,
quota_type_id=quota_type_id_map["Snapshots"]
)
if len(allowed_quota):
try:
data = AllowedQuota.deserialize(allowed_quota[0])
data.update({"actual_value": snapshot_count})
data.pop("quota_type_name", None)
data.pop("version", None)
data.pop("id", None)
return AllowedQuota(**data)
except Exception as err:
if import_count:
raise ValueError(
"Current project's snapshot allowed quota has exceeded. Contact to the admin"
)
raise err
return None