Repository URL to install this package:
|
Version:
1.0.24 ▾
|
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy. DO NOT EDIT!
# source: warehouses/warehouses.proto
from google.protobuf import symbol_database as _symbol_database
from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient
_sym_db = _symbol_database.Default()
class WarehousesServiceServer(TwirpServer):
def __init__(self, *args, service):
super().__init__(service=service)
self._prefix = "/twirp/warehouses.WarehousesService"
self._endpoints = {
"ListWarehouses": Endpoint(
service_name="WarehousesService",
name="ListWarehouses",
function=getattr(service, "ListWarehouses"),
input=_sym_db.GetSymbol("warehouses.ListWarehousesRequest"),
output=_sym_db.GetSymbol("warehouses.ListWarehousesResponse"),
),
"ListTables": Endpoint(
service_name="WarehousesService",
name="ListTables",
function=getattr(service, "ListTables"),
input=_sym_db.GetSymbol("warehouses.ListTablesRequest"),
output=_sym_db.GetSymbol("warehouses.ListTablesResponse"),
),
"IntrospectTable": Endpoint(
service_name="WarehousesService",
name="IntrospectTable",
function=getattr(service, "IntrospectTable"),
input=_sym_db.GetSymbol("warehouses.IntrospectTableRequest"),
output=_sym_db.GetSymbol("warehouses.IntrospectTableResponse"),
),
"Ping": Endpoint(
service_name="WarehousesService",
name="Ping",
function=getattr(service, "Ping"),
input=_sym_db.GetSymbol("warehouses.PingRequest"),
output=_sym_db.GetSymbol("warehouses.PingResponse"),
),
"Healthz": Endpoint(
service_name="WarehousesService",
name="Healthz",
function=getattr(service, "Healthz"),
input=_sym_db.GetSymbol("warehouses.Empty"),
output=_sym_db.GetSymbol("warehouses.Empty"),
),
}
class WarehousesServiceClient(TwirpClient):
def ListWarehouses(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/warehouses.WarehousesService/ListWarehouses",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("warehouses.ListWarehousesResponse"),
**kwargs,
)
def ListTables(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/warehouses.WarehousesService/ListTables",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("warehouses.ListTablesResponse"),
**kwargs,
)
def IntrospectTable(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/warehouses.WarehousesService/IntrospectTable",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("warehouses.IntrospectTableResponse"),
**kwargs,
)
def Ping(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/warehouses.WarehousesService/Ping",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("warehouses.PingResponse"),
**kwargs,
)
def Healthz(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/warehouses.WarehousesService/Healthz",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("warehouses.Empty"),
**kwargs,
)