Repository URL to install this package:
|
Version:
1.0.24 ▾
|
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy. DO NOT EDIT!
# source: query/query.proto
from google.protobuf import symbol_database as _symbol_database
from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient
_sym_db = _symbol_database.Default()
class QueryServiceServer(TwirpServer):
def __init__(self, *args, service):
super().__init__(service=service)
self._prefix = "/twirp/query.QueryService"
self._endpoints = {
"Healthz": Endpoint(
service_name="QueryService",
name="Healthz",
function=getattr(service, "Healthz"),
input=_sym_db.GetSymbol("query.HealthzRequest"),
output=_sym_db.GetSymbol("query.HealthzResponse"),
),
"Warehousez": Endpoint(
service_name="QueryService",
name="Warehousez",
function=getattr(service, "Warehousez"),
input=_sym_db.GetSymbol("query.WarehousezRequest"),
output=_sym_db.GetSymbol("query.WarehousezResponse"),
),
"Query": Endpoint(
service_name="QueryService",
name="Query",
function=getattr(service, "Query"),
input=_sym_db.GetSymbol("query.QueryRequest"),
output=_sym_db.GetSymbol("query.QueryResponse"),
),
"GetMetadata": Endpoint(
service_name="QueryService",
name="GetMetadata",
function=getattr(service, "GetMetadata"),
input=_sym_db.GetSymbol("query.GetMetadataRequest"),
output=_sym_db.GetSymbol("query.GetMetadataResponse"),
),
}
class QueryServiceClient(TwirpClient):
def Healthz(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/query.QueryService/Healthz",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("query.HealthzResponse"),
**kwargs,
)
def Warehousez(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/query.QueryService/Warehousez",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("query.WarehousezResponse"),
**kwargs,
)
def Query(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/query.QueryService/Query",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("query.QueryResponse"),
**kwargs,
)
def GetMetadata(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/query.QueryService/GetMetadata",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("query.GetMetadataResponse"),
**kwargs,
)