Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
launchdarkly-server-sdk / stub_util.py
Size: Mime:
from email.utils import formatdate
from requests.structures import CaseInsensitiveDict

from ldclient.interfaces import EventProcessor, FeatureRequester, FeatureStore, UpdateProcessor


class MockEventProcessor(EventProcessor):
    def __init__(self, *_):
        self._running = False
        self._events = []

    def stop(self):
        self._running = False

    def start(self):
        self._running = True

    def is_alive(self):
        return self._running

    def send_event(self, event):
        self._events.append(event)

    def flush(self):
        pass

class MockFeatureRequester(FeatureRequester):
    def __init__(self):
        self.all_data = {}
        self.exception = None
        self.request_count = 0

    def get_all_data(self):
        self.request_count += 1
        if self.exception is not None:
            raise self.exception
        return self.all_data

    def get_one(self, kind, key):
        pass

class MockResponse(object):
    def __init__(self, status, headers):
        self._status = status
        self._headers = headers

    @property
    def status(self):
        return self._status

    def getheader(self, name):
        return self._headers.get(name.lower())

class MockHttp(object):
    def __init__(self):
        self._request_data = None
        self._request_headers = None
        self._response_status = 200
        self._server_time = None

    def request(self, method, uri, headers, timeout, body, retries):
        self._request_headers = headers
        self._request_data = body
        resp_hdr = dict()
        if self._server_time is not None:
            resp_hdr['date'] = formatdate(self._server_time / 1000, localtime=False, usegmt=True)
        return MockResponse(self._response_status, resp_hdr)

    def clear(self):
        pass

    @property
    def request_data(self):
        return self._request_data

    @property
    def request_headers(self):
        return self._request_headers

    def set_response_status(self, status):
        self._response_status = status
    
    def set_server_time(self, timestamp):
        self._server_time = timestamp

    def reset(self):
        self._request_headers = None
        self._request_data = None

class MockUpdateProcessor(UpdateProcessor):
    def __init__(self, config, store, ready):
        ready.set()

    def start(self):
        pass

    def stop(self):
        pass

    def is_alive(self):
        return True

    def initialized(self):
        return True

class CapturingFeatureStore(FeatureStore):
    def init(self, all_data):
        self.data = all_data

    def get(self, kind, key, callback=lambda x: x):    
        pass
    
    def all(self, kind, callback=lambda x: x):
        pass
    
    def delete(self, kind, key, version):
        pass
    
    def upsert(self, kind, item):
        pass
    
    @property
    def initialized(self):
        return True
    
    @property
    def received_data(self):
        return self.data