Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import attr
from workloadmgr.common import context as wlm_context
from workloadmgr.common.constants import quota_type_id_map, quota_type_name_map


def get_snapshot_count(context):
    # TODO: add support to calculate actual value
    return 1


def get_vm_count(context):
    # TODO: add support to calculate actual value
    return 1


def get_volume_count(context):
    # TODO: add support to calculate actual value
    return 1


def get_storage(context):
    # TODO: add support to calculate actual value
    return 1


quota_type_desc_map = {
    "Workloads": "Total number of workload creation allowed per project",
    "Snapshots": "Total number of snapshot creation allowed per project",
    "VMs": "Total number of VMs allowed per project",
    "Volumes": "Total number of volume attachments allowed per project",
    "Storage": "Total storage (in Bytes) allowed per project",
}


@attr.s
class ProjectQuotaTypes:
    id = attr.ib(type=str)
    display_name = attr.ib(type=str)
    display_description = attr.ib(type=str, default=None)

    @display_name.validator
    def _validate_display_name(self, attribute, value):
        if value not in quota_type_desc_map:
            raise ValueError(
                "display_name must one of following: {}".format(
                    list(quota_type_desc_map.keys())
                )
            )

    def to_dict(self):
        return {
            "id": self.id,
            "display_name": self.display_name,
            "display_description": self.display_description,
        }

    @staticmethod
    def deserialize(obj):
        if obj:
            return {
                "id": obj.id,
                "display_name": obj.display_name,
                "display_description": obj.display_description
            }
        return {}


@attr.s
class AllowedQuota:
    """ Validator class for allowed quota data """
    project_id = attr.ib(type=str)
    quota_type_id = attr.ib(type=str)
    allowed_value = attr.ib(type=int, default=-1)
    actual_value = attr.ib(type=int, default=0)
    high_watermark = attr.ib(type=int)

    @high_watermark.default
    def _set_default_based_on_allowed_value(self):
        if self.allowed_value > 0:
            return int((self.allowed_value / 100) * 80)
        return -1

    @allowed_value.validator
    def _validate_allowed_value(self, attribute, value):
        if value and int(value) != -1:
            if int(value) < -1:
                raise ValueError(
                    "allowed_value must be whole number"
                )
            elif self.actual_value and (
                int(value) < int(self.actual_value)
            ):
                raise ValueError(
                    "actual_value must be smaller or equal to allowed_value"
                )

    @high_watermark.validator
    def _validate_high_watermark(self, attribute, value):
        if value and int(value) != -1:
            if int(value) < -1:
                raise ValueError(
                    "high_watermark must be whole number"
                )
            else:
                if int(value) > int(self.allowed_value):
                    raise ValueError(
                        "high_watermark must be smaller than allowed_value"
                    )
                if not self.allowed_value:
                    raise ValueError(
                        "high_watermark is valid only if allowed_value is defined"
                    )

    def to_dict(self, avoid_none=False):
        if not avoid_none:
            return {
                "project_id": self.project_id,
                "quota_type_id": self.quota_type_id,
                "allowed_value": self.allowed_value,
                "high_watermark": self.high_watermark
            }
        else:
            data = {
                "project_id": self.project_id,
                "quota_type_id": self.quota_type_id
            }
            filter_keys = ["allowed_value", "actual_value", "high_watermark"]
            for key in filter_keys:
                if self.get(key, None):
                    data[key] = self.get(key)
            return data

    @staticmethod
    def deserialize(obj):
        if obj:
            return {
                "id": obj.id,
                "project_id": obj.project_id,
                "quota_type_id": obj.quota_type_id,
                "allowed_value": obj.allowed_value,
                "high_watermark": obj.high_watermark,
                "version": obj.version,
                "quota_type_name": quota_type_name_map.get(
                    obj.quota_type_id, ''
                )
            }
        return {}


def workload_quota_check(project_id, db, import_count=1):
    admin_context = wlm_context.get_admin_context()
    workload_count = db.get_workload_count(admin_context, project_id)
    if import_count and import_count > 0:
        workload_count += import_count
    allowed_quota = db.get_allowed_quotas(
        admin_context, project_id, allowed_quota_id=None,
        quota_type_id=quota_type_id_map["Workloads"]
    )
    if len(allowed_quota):
        try:
            data = AllowedQuota.deserialize(allowed_quota[0])
            data.update({"actual_value": workload_count})
            data.pop("quota_type_name", None)
            data.pop("version", None)
            data.pop("id", None)
            return AllowedQuota(**data)
        except Exception as err:
            if import_count:
                raise Exception(
                    "Can't perform workload creation/updation because project allowed quota will exceed"
                )
            raise err
    return None


def storage_quota_check(project_id, db, total_storage, next_storage=None):
    if next_storage:
        total_storage += next_storage
    admin_context = wlm_context.get_admin_context()
    allowed_quota = db.get_allowed_quotas(
        admin_context, project_id, allowed_quota_id=None,
        quota_type_id=quota_type_id_map["Storage"]
    )
    if len(allowed_quota):
        try:
            data = AllowedQuota.deserialize(allowed_quota[0])
            data.update({"actual_value": total_storage})
            data.pop("quota_type_name", None)
            data.pop("version", None)
            data.pop("id", None)
            return AllowedQuota(**data)
        except Exception as err:
            if next_storage:
                raise ValueError(
                    "Can't perform snapshot because project storage allowed quota will exceed"
                )
            raise err
    return None


def vms_quota_check(project_id, db, bulk_count=1):
    admin_context = wlm_context.get_admin_context()
    snapshot_count = db.get_vms_count_by_project_id(admin_context, project_id)
    if bulk_count and bulk_count > 0:
        snapshot_count += bulk_count

    allowed_quota = db.get_allowed_quotas(
        admin_context, project_id, allowed_quota_id=None,
        quota_type_id=quota_type_id_map["VMs"]
    )
    if len(allowed_quota):
        try:
            data = AllowedQuota.deserialize(allowed_quota[0])
            data.update({"actual_value": snapshot_count})
            data.pop("quota_type_name", None)
            data.pop("version", None)
            data.pop("id", None)
            return AllowedQuota(**data)
        except Exception as err:
            if bulk_count:
                raise Exception(
                    "Can't perform workload creation because project's VM allowed quota will exceed"
                )
            raise err
    return None


def snapshot_quota_check(project_id, db, import_count=1):
    admin_context = wlm_context.get_admin_context()
    snapshot_count = db.get_active_snapshot_count(admin_context, project_id)
    if import_count and import_count > 0:
        snapshot_count += import_count
    allowed_quota = db.get_allowed_quotas(
        admin_context, project_id, allowed_quota_id=None,
        quota_type_id=quota_type_id_map["Snapshots"]
    )
    if len(allowed_quota):
        try:
            data = AllowedQuota.deserialize(allowed_quota[0])
            data.update({"actual_value": snapshot_count})
            data.pop("quota_type_name", None)
            data.pop("version", None)
            data.pop("id", None)
            return AllowedQuota(**data)
        except Exception as err:
            if import_count:
                raise ValueError(
                    "Current project's snapshot allowed quota has exceeded. Contact to the admin"
                )
            raise err
    return None