Repository URL to install this package:
|
Version:
1.0.18 ▾
|
# Code generated by protoc-gen-twirp_python v5.9.0, DO NOT EDIT.
# source: sources.proto
try:
import httplib
from urllib2 import Request, HTTPError, urlopen
except ImportError:
import http.client as httplib
from urllib.request import Request, urlopen
from urllib.error import HTTPError
import json
from google.protobuf import symbol_database as _symbol_database
import sys
_sym_db = _symbol_database.Default()
class TwirpException(httplib.HTTPException):
def __init__(self, code, message, meta):
self.code = code
self.message = message
self.meta = meta
super(TwirpException, self).__init__(message)
@classmethod
def from_http_err(cls, err):
try:
jsonerr = json.load(err)
code = jsonerr["code"]
msg = jsonerr["msg"]
meta = jsonerr.get("meta")
if meta is None:
meta = {}
except:
code = "internal"
msg = "Error from intermediary with HTTP status code {} {}".format(
err.code, httplib.responses[err.code],
)
meta = {}
return cls(code, msg, meta)
class SourcesServiceClient(object):
def __init__(self, server_address, request_decorator=None):
"""Creates a new client for the SourcesService service.
Args:
server_address: The address of the server to send requests to, in
the full protocol://host:port form.
request_decorator: A function to modify the http request being
sent. Takes in a urllib.request.Request object and returns the
same request object
"""
if sys.version_info[0] > 2:
self.__target = server_address
else:
self.__target = server_address.encode('ascii')
self.__service_name = "sources.SourcesService"
self.__request_decorator = request_decorator
def __make_request(self, body, full_method):
req = Request(
url=self.__target + "/twirp" + full_method,
data=body,
headers={"Content-Type": "application/protobuf"},
)
if self.__request_decorator:
req = self.__request_decorator(req)
try:
resp = urlopen(req)
except HTTPError as err:
raise TwirpException.from_http_err(err)
return resp.read()
def create_source(self, create_source_request):
"""
======= Sources ===========
Create the source
"""
serialize = _sym_db.GetSymbol("sources.CreateSourceRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.CreateSourceResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "CreateSource")
body = serialize(create_source_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def list_sources(self, list_sources_request):
"""
Lists all sources
"""
serialize = _sym_db.GetSymbol("sources.ListSourcesRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.ListSourcesResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "ListSources")
body = serialize(list_sources_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def get_source(self, get_source_request):
"""
Gets a specific source
"""
serialize = _sym_db.GetSymbol("sources.GetSourceRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.GetSourceResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "GetSource")
body = serialize(get_source_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def update_source(self, update_source_request):
"""
Updates a source
"""
serialize = _sym_db.GetSymbol("sources.UpdateSourceRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.UpdateSourceResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "UpdateSource")
body = serialize(update_source_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def delete_source(self, delete_source_request):
"""
Deletes a source
"""
serialize = _sym_db.GetSymbol("sources.DeleteSourceRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.DeleteSourceResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "DeleteSource")
body = serialize(delete_source_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def get_import_lite_signed_url(self, get_import_lite_signed_url_request):
"""
Gets or Creates and returns a source for an organizations import lite
"""
serialize = _sym_db.GetSymbol("sources.GetImportLiteSignedUrlRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "GetImportLiteSignedUrl")
body = serialize(get_import_lite_signed_url_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def create_credential(self, create_credential_request):
"""
========= Credentials ==============
Creates a new credential
"""
serialize = _sym_db.GetSymbol("sources.CreateCredentialRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.CreateCredentialResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "CreateCredential")
body = serialize(create_credential_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def list_credentials(self, list_credentials_request):
"""
Lists all credentials
"""
serialize = _sym_db.GetSymbol("sources.ListCredentialsRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.ListCredentialsResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "ListCredentials")
body = serialize(list_credentials_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def get_credential(self, get_credential_request):
"""
Gets a specific credential
"""
serialize = _sym_db.GetSymbol("sources.GetCredentialRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.GetCredentialResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "GetCredential")
body = serialize(get_credential_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def update_credential(self, update_credential_request):
"""
Updates a credential
"""
serialize = _sym_db.GetSymbol("sources.UpdateCredentialRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.UpdateCredentialResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "UpdateCredential")
body = serialize(update_credential_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def delete_credential(self, delete_credential_request):
"""
Deletes a credential
"""
serialize = _sym_db.GetSymbol("sources.DeleteCredentialRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.DeleteCredentialResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "DeleteCredential")
body = serialize(delete_credential_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def list_warehouses(self, list_warehouses_request):
"""
============ Warehouses ====================
"""
serialize = _sym_db.GetSymbol("sources.ListWarehousesRequest").SerializeToString
deserialize = _sym_db.GetSymbol("sources.ListWarehousesResponse").FromString
full_method = "/{}/{}".format(self.__service_name, "ListWarehouses")
body = serialize(list_warehouses_request)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)
def healthz(self, empty):
"""
============= Health check ================
"""
serialize = _sym_db.GetSymbol("sources.Empty").SerializeToString
deserialize = _sym_db.GetSymbol("sources.Empty").FromString
full_method = "/{}/{}".format(self.__service_name, "Healthz")
body = serialize(empty)
resp_str = self.__make_request(body=body, full_method=full_method)
return deserialize(resp_str)