Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara-python / rpc / sources_pb2_twirp.py
Size: Mime:
# Code generated by protoc-gen-twirp_python v5.9.0, DO NOT EDIT.
# source: sources.proto

try:
    import httplib
    from urllib2 import Request, HTTPError, urlopen
except ImportError:
    import http.client as httplib
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError
import json
from google.protobuf import symbol_database as _symbol_database
import sys

_sym_db = _symbol_database.Default()

class TwirpException(httplib.HTTPException):
    def __init__(self, code, message, meta):
        self.code = code
        self.message = message
        self.meta = meta
        super(TwirpException, self).__init__(message)

    @classmethod
    def from_http_err(cls, err):
        try:
            jsonerr = json.load(err)
            code = jsonerr["code"]
            msg = jsonerr["msg"]
            meta = jsonerr.get("meta")
            if meta is None:
                meta = {}
        except:
            code = "internal"
            msg = "Error from intermediary with HTTP status code {} {}".format(
                err.code, httplib.responses[err.code],
            )
            meta = {}
        return cls(code, msg, meta)

class SourcesServiceClient(object):
    def __init__(self, server_address, request_decorator=None):
        """Creates a new client for the SourcesService service.

        Args:
            server_address: The address of the server to send requests to, in
                the full protocol://host:port form.
            request_decorator: A function to modify the http request being
		         sent. Takes in a urllib.request.Request object and returns the
		         same request object
        """
        if sys.version_info[0] > 2:
            self.__target = server_address
        else:
            self.__target = server_address.encode('ascii')
        self.__service_name = "sources.SourcesService"

        self.__request_decorator = request_decorator

    def __make_request(self, body, full_method):
        req = Request(
            url=self.__target + "/twirp" + full_method,
            data=body,
            headers={"Content-Type": "application/protobuf"},
        )

        if self.__request_decorator:
            req = self.__request_decorator(req)

        try:
            resp = urlopen(req)
        except HTTPError as err:
            raise TwirpException.from_http_err(err)

        return resp.read()

    def create_source(self, create_source_request):
        """
        ======= Sources ===========
        Create the source
        """

        serialize = _sym_db.GetSymbol("sources.CreateSourceRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.CreateSourceResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "CreateSource")
        body = serialize(create_source_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def list_sources(self, list_sources_request):
        """
        Lists all sources
        """

        serialize = _sym_db.GetSymbol("sources.ListSourcesRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.ListSourcesResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "ListSources")
        body = serialize(list_sources_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def get_source(self, get_source_request):
        """
        Gets a specific source
        """

        serialize = _sym_db.GetSymbol("sources.GetSourceRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.GetSourceResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "GetSource")
        body = serialize(get_source_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def update_source(self, update_source_request):
        """
        Updates a source
        """

        serialize = _sym_db.GetSymbol("sources.UpdateSourceRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.UpdateSourceResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "UpdateSource")
        body = serialize(update_source_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def delete_source(self, delete_source_request):
        """
        Deletes a source
        """

        serialize = _sym_db.GetSymbol("sources.DeleteSourceRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.DeleteSourceResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "DeleteSource")
        body = serialize(delete_source_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def get_import_lite_signed_url(self, get_import_lite_signed_url_request):
        """
        Gets or Creates and returns a source for an organizations import lite
        """

        serialize = _sym_db.GetSymbol("sources.GetImportLiteSignedUrlRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "GetImportLiteSignedUrl")
        body = serialize(get_import_lite_signed_url_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def create_credential(self, create_credential_request):
        """
        ========= Credentials ==============
        Creates a new credential
        """

        serialize = _sym_db.GetSymbol("sources.CreateCredentialRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.CreateCredentialResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "CreateCredential")
        body = serialize(create_credential_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def list_credentials(self, list_credentials_request):
        """
        Lists all credentials
        """

        serialize = _sym_db.GetSymbol("sources.ListCredentialsRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.ListCredentialsResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "ListCredentials")
        body = serialize(list_credentials_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def get_credential(self, get_credential_request):
        """
        Gets a specific credential
        """

        serialize = _sym_db.GetSymbol("sources.GetCredentialRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.GetCredentialResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "GetCredential")
        body = serialize(get_credential_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def update_credential(self, update_credential_request):
        """
        Updates a credential
        """

        serialize = _sym_db.GetSymbol("sources.UpdateCredentialRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.UpdateCredentialResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "UpdateCredential")
        body = serialize(update_credential_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def delete_credential(self, delete_credential_request):
        """
        Deletes a credential
        """

        serialize = _sym_db.GetSymbol("sources.DeleteCredentialRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.DeleteCredentialResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "DeleteCredential")
        body = serialize(delete_credential_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def list_warehouses(self, list_warehouses_request):
        """
        ============ Warehouses ====================
        """

        serialize = _sym_db.GetSymbol("sources.ListWarehousesRequest").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.ListWarehousesResponse").FromString

        full_method = "/{}/{}".format(self.__service_name, "ListWarehouses")
        body = serialize(list_warehouses_request)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)

    def healthz(self, empty):
        """
        ============= Health check ================
        """

        serialize = _sym_db.GetSymbol("sources.Empty").SerializeToString
        deserialize = _sym_db.GetSymbol("sources.Empty").FromString

        full_method = "/{}/{}".format(self.__service_name, "Healthz")
        body = serialize(empty)
        resp_str = self.__make_request(body=body, full_method=full_method)
        return deserialize(resp_str)