Repository URL to install this package:
|
Version:
1.0.22 ▾
|
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy. DO NOT EDIT!
# source: sources.proto
from google.protobuf import symbol_database as _symbol_database
from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient
_sym_db = _symbol_database.Default()
class SourcesServiceServer(TwirpServer):
def __init__(self, *args, service):
super().__init__(service=service)
self._prefix = "/twirp/sources.SourcesService"
self._endpoints = {
"CreateSource": Endpoint(
service_name="SourcesService",
name="CreateSource",
function=getattr(service, "CreateSource"),
input=_sym_db.GetSymbol("sources.CreateSourceRequest"),
output=_sym_db.GetSymbol("sources.CreateSourceResponse"),
),
"ListSources": Endpoint(
service_name="SourcesService",
name="ListSources",
function=getattr(service, "ListSources"),
input=_sym_db.GetSymbol("sources.ListSourcesRequest"),
output=_sym_db.GetSymbol("sources.ListSourcesResponse"),
),
"GetSource": Endpoint(
service_name="SourcesService",
name="GetSource",
function=getattr(service, "GetSource"),
input=_sym_db.GetSymbol("sources.GetSourceRequest"),
output=_sym_db.GetSymbol("sources.GetSourceResponse"),
),
"UpdateSource": Endpoint(
service_name="SourcesService",
name="UpdateSource",
function=getattr(service, "UpdateSource"),
input=_sym_db.GetSymbol("sources.UpdateSourceRequest"),
output=_sym_db.GetSymbol("sources.UpdateSourceResponse"),
),
"DeleteSource": Endpoint(
service_name="SourcesService",
name="DeleteSource",
function=getattr(service, "DeleteSource"),
input=_sym_db.GetSymbol("sources.DeleteSourceRequest"),
output=_sym_db.GetSymbol("sources.DeleteSourceResponse"),
),
"GetImportLiteSignedUrl": Endpoint(
service_name="SourcesService",
name="GetImportLiteSignedUrl",
function=getattr(service, "GetImportLiteSignedUrl"),
input=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlRequest"),
output=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse"),
),
"CreateCredential": Endpoint(
service_name="SourcesService",
name="CreateCredential",
function=getattr(service, "CreateCredential"),
input=_sym_db.GetSymbol("sources.CreateCredentialRequest"),
output=_sym_db.GetSymbol("sources.CreateCredentialResponse"),
),
"ListCredentials": Endpoint(
service_name="SourcesService",
name="ListCredentials",
function=getattr(service, "ListCredentials"),
input=_sym_db.GetSymbol("sources.ListCredentialsRequest"),
output=_sym_db.GetSymbol("sources.ListCredentialsResponse"),
),
"GetCredential": Endpoint(
service_name="SourcesService",
name="GetCredential",
function=getattr(service, "GetCredential"),
input=_sym_db.GetSymbol("sources.GetCredentialRequest"),
output=_sym_db.GetSymbol("sources.GetCredentialResponse"),
),
"UpdateCredential": Endpoint(
service_name="SourcesService",
name="UpdateCredential",
function=getattr(service, "UpdateCredential"),
input=_sym_db.GetSymbol("sources.UpdateCredentialRequest"),
output=_sym_db.GetSymbol("sources.UpdateCredentialResponse"),
),
"DeleteCredential": Endpoint(
service_name="SourcesService",
name="DeleteCredential",
function=getattr(service, "DeleteCredential"),
input=_sym_db.GetSymbol("sources.DeleteCredentialRequest"),
output=_sym_db.GetSymbol("sources.DeleteCredentialResponse"),
),
"ListWarehouses": Endpoint(
service_name="SourcesService",
name="ListWarehouses",
function=getattr(service, "ListWarehouses"),
input=_sym_db.GetSymbol("sources.ListWarehousesRequest"),
output=_sym_db.GetSymbol("sources.ListWarehousesResponse"),
),
"Healthz": Endpoint(
service_name="SourcesService",
name="Healthz",
function=getattr(service, "Healthz"),
input=_sym_db.GetSymbol("sources.Empty"),
output=_sym_db.GetSymbol("sources.Empty"),
),
}
class SourcesServiceClient(TwirpClient):
def CreateSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/CreateSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.CreateSourceResponse"),
**kwargs,
)
def ListSources(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListSources",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListSourcesResponse"),
**kwargs,
)
def GetSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetSourceResponse"),
**kwargs,
)
def UpdateSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/UpdateSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.UpdateSourceResponse"),
**kwargs,
)
def DeleteSource(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/DeleteSource",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.DeleteSourceResponse"),
**kwargs,
)
def GetImportLiteSignedUrl(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetImportLiteSignedUrl",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse"),
**kwargs,
)
def CreateCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/CreateCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.CreateCredentialResponse"),
**kwargs,
)
def ListCredentials(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListCredentials",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListCredentialsResponse"),
**kwargs,
)
def GetCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/GetCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.GetCredentialResponse"),
**kwargs,
)
def UpdateCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/UpdateCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.UpdateCredentialResponse"),
**kwargs,
)
def DeleteCredential(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/DeleteCredential",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.DeleteCredentialResponse"),
**kwargs,
)
def ListWarehouses(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/ListWarehouses",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.ListWarehousesResponse"),
**kwargs,
)
def Healthz(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/sources.SourcesService/Healthz",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("sources.Empty"),
**kwargs,
)