Repository URL to install this package:
|
Version:
1.0.24 ▾
|
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy. DO NOT EDIT!
# source: sources/sources.proto
from google.protobuf import symbol_database as _symbol_database
from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient
_sym_db = _symbol_database.Default()
class SourcesServiceServer(TwirpServer):
def __init__(self, *args, service):
super().__init__(service=service)
self._prefix = "/twirp/sources.SourcesService"
self._endpoints = {
"CreateSource": Endpoint(
service_name="SourcesService",
name="CreateSource",
function=getattr(service, "CreateSource"),
input=_sym_db.GetSymbol("sources.CreateSourceRequest"),
output=_sym_db.GetSymbol("sources.CreateSourceResponse"),
),
"ListSources": Endpoint(
service_name="SourcesService",
name="ListSources",
function=getattr(service, "ListSources"),
input=_sym_db.GetSymbol("sources.ListSourcesRequest"),
output=_sym_db.GetSymbol("sources.ListSourcesResponse"),
),
"ListSourcesWithConnections": Endpoint(
service_name="SourcesService",
name="ListSourcesWithConnections",
function=getattr(service, "ListSourcesWithConnections"),
input=_sym_db.GetSymbol("sources.ListSourcesRequest"),
output=_sym_db.GetSymbol("sources.ListSourcesWithConnectionsResponse"),
),
"GetSource": Endpoint(
service_name="SourcesService",
name="GetSource",
function=getattr(service, "GetSource"),
input=_sym_db.GetSymbol("sources.GetSourceRequest"),
output=_sym_db.GetSymbol("sources.GetSourceResponse"),
),
"GetSourceWithConnections": Endpoint(
service_name="SourcesService",
name="GetSourceWithConnections",
function=getattr(service, "GetSourceWithConnections"),
input=_sym_db.GetSymbol("sources.GetSourceRequest"),
output=_sym_db.GetSymbol("sources.GetSourceWithConnectionsResponse"),
),
"UpdateSource": Endpoint(
service_name="SourcesService",
name="UpdateSource",
function=getattr(service, "UpdateSource"),
input=_sym_db.GetSymbol("sources.UpdateSourceRequest"),
output=_sym_db.GetSymbol("sources.UpdateSourceResponse"),
),
"DeleteSource": Endpoint(
service_name="SourcesService",
name="DeleteSource",
function=getattr(service, "DeleteSource"),
input=_sym_db.GetSymbol("sources.DeleteSourceRequest"),
output=_sym_db.GetSymbol("sources.DeleteSourceResponse"),
),
"GetImportLiteSignedUrl": Endpoint(
service_name="SourcesService",
name="GetImportLiteSignedUrl",
function=getattr(service, "GetImportLiteSignedUrl"),
input=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlRequest"),
output=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse"),
),
"CreateCredential": Endpoint(
service_name="SourcesService",
name="CreateCredential",
function=getattr(service, "CreateCredential"),
input=_sym_db.GetSymbol("sources.CreateCredentialRequest"),
output=_sym_db.GetSymbol("sources.CreateCredentialResponse"),
),
"ListCredentials": Endpoint(
service_name="SourcesService",
name="ListCredentials",
function=getattr(service, "ListCredentials"),
input=_sym_db.GetSymbol("sources.ListCredentialsRequest"),
output=_sym_db.GetSymbol("sources.ListCredentialsResponse"),
),
"GetCredential": Endpoint(
service_name="SourcesService",
name="GetCredential",
function=getattr(service, "GetCredential"),
input=_sym_db.GetSymbol("sources.GetCredentialRequest"),
output=_sym_db.GetSymbol("sources.GetCredentialResponse"),
),
"UpdateCredential": Endpoint(
service_name="SourcesService",
name="UpdateCredential",
function=getattr(service, "UpdateCredential"),
input=_sym_db.GetSymbol("sources.UpdateCredentialRequest"),
output=_sym_db.GetSymbol("sources.UpdateCredentialResponse"),
),
"DeleteCredential": Endpoint(
service_name="SourcesService",
name="DeleteCredential",
function=getattr(service, "DeleteCredential"),
input=_sym_db.GetSymbol("sources.DeleteCredentialRequest"),
output=_sym_db.GetSymbol("sources.DeleteCredentialResponse"),
),
"CreateConnection": Endpoint(
service_name="SourcesService",
name="CreateConnection",
function=getattr(service, "CreateConnection"),
input=_sym_db.GetSymbol("sources.CreateConnectionRequest"),
output=_sym_db.GetSymbol("sources.CreateConnectionResponse"),
),
"ListConnections": Endpoint(
service_name="SourcesService",
name="ListConnections",
function=getattr(service, "ListConnections"),
input=_sym_db.GetSymbol("sources.ListConnectionsRequest"),
output=_sym_db.GetSymbol("sources.ListConnectionsResponse"),
),
"GetConnection": Endpoint(
service_name="SourcesService",
name="GetConnection",
function=getattr(service, "GetConnection"),
input=_sym_db.GetSymbol("sources.GetConnectionRequest"),
output=_sym_db.GetSymbol("sources.GetConnectionResponse"),
),
"UpdateConnection": Endpoint(
service_name="SourcesService",
name="UpdateConnection",
function=getattr(service, "UpdateConnection"),
input=_sym_db.GetSymbol("sources.UpdateConnectionRequest"),
output=_sym_db.GetSymbol("sources.UpdateConnectionResponse"),
),
"DeleteConnection": Endpoint(
service_name="SourcesService",
name="DeleteConnection",
function=getattr(service, "DeleteConnection"),
input=_sym_db.GetSymbol("sources.DeleteConnectionRequest"),
output=_sym_db.GetSymbol("sources.DeleteConnectionResponse"),
),
"ListWarehouses": Endpoint(
service_name="SourcesService",
name="ListWarehouses",
function=getattr(service, "ListWarehouses"),
input=_sym_db.GetSymbol("sources.ListWarehousesRequest"),
output=_sym_db.GetSymbol("sources.ListWarehousesResponse"),
),
"Healthz": Endpoint(
service_name="SourcesService",
name="Healthz",
function=getattr(service, "Healthz"),
input=_sym_db.GetSymbol("sources.Empty"),
output=_sym_db.GetSymbol("sources.Empty"),
),
}
class SourcesServiceClient(TwirpClient):
def CreateSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/CreateSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.CreateSourceResponse"),
**kwargs,
)
def ListSources(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListSources",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListSourcesResponse"),
**kwargs,
)
def ListSourcesWithConnections(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListSourcesWithConnections",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListSourcesWithConnectionsResponse"),
**kwargs,
)
def GetSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetSourceResponse"),
**kwargs,
)
def GetSourceWithConnections(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetSourceWithConnections",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetSourceWithConnectionsResponse"),
**kwargs,
)
def UpdateSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/UpdateSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.UpdateSourceResponse"),
**kwargs,
)
def DeleteSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/DeleteSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.DeleteSourceResponse"),
**kwargs,
)
def GetImportLiteSignedUrl(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetImportLiteSignedUrl",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse"),
**kwargs,
)
def CreateCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/CreateCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.CreateCredentialResponse"),
**kwargs,
)
def ListCredentials(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListCredentials",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListCredentialsResponse"),
**kwargs,
)
def GetCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetCredentialResponse"),
**kwargs,
)
def UpdateCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/UpdateCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.UpdateCredentialResponse"),
**kwargs,
)
def DeleteCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/DeleteCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.DeleteCredentialResponse"),
**kwargs,
)
def CreateConnection(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/CreateConnection",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.CreateConnectionResponse"),
**kwargs,
)
def ListConnections(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListConnections",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListConnectionsResponse"),
**kwargs,
)
def GetConnection(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetConnection",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetConnectionResponse"),
**kwargs,
)
def UpdateConnection(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/UpdateConnection",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.UpdateConnectionResponse"),
**kwargs,
)
def DeleteConnection(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/DeleteConnection",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.DeleteConnectionResponse"),
**kwargs,
)
def ListWarehouses(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListWarehouses",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListWarehousesResponse"),
**kwargs,
)
def Healthz(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/Healthz",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.Empty"),
**kwargs,
)