Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / data / base.py
Size: Mime:
try:
    from typing import Dict, Optional
except ImportError:
    pass
from supertenant import consts


class BaseData(object):  # py2.7 use new-style class
    def __init__(self, resource_type, integration_module):
        # type: (str, str) -> None
        self.tags = {}  # type: Dict[str, str]
        self.set_tag(consts.LABEL_SUPERTENANT_RESOURCE_TYPE, resource_type)
        self.set_tag(consts.LABEL_SUPERTENANT_INTEGRATION_MODULE, integration_module)

    def get_tags(self):
        # type: () -> Dict[str, str]
        return self.tags

    def set_tag(self, key, value):
        # type: (str, str) -> None
        self.tags[key] = value

    def get_tag(self, key):
        # type: (str) -> Optional[str]
        return self.tags.get(key)

    def mark_error(self):
        # type: () -> None
        # TODO: change label value (remove http)
        self.set_tag(consts.LABEL_SUPERTENANT_ERROR, "true")

    def set_span_type(self, span_type):
        # type: (int) -> None
        self.set_tag(consts.LABEL_SUPERTENANT_SPAN_TYPE, "%d" % (span_type,))

    def get_span_type(self):
        # type: () -> Optional[int]
        s = self.get_tag(consts.LABEL_SUPERTENANT_SPAN_TYPE)
        return int(s) if s else None

    def set_integration_module_resource_id(self, value):
        # type: (str) -> None
        self.set_tag(consts.LABEL_INTEGRATION_MODULE_RESOURCE_ID, value)

    def get_integration_module_resource_id(self):
        # type: () -> Optional[str]
        return self.get_tag(consts.LABEL_INTEGRATION_MODULE_RESOURCE_ID)

    def __repr__(self):
        # type: () -> str
        return "BaseData " + repr(self.tags)


def new_base_data(resource_type, integration_module, tags):
    # type: (str, str, Dict[str, str]) -> BaseData
    bd = BaseData(resource_type, integration_module)
    for k, v in tags.items():
        bd.set_tag(k, v)
    return bd