Repository URL to install this package:
|
Version:
1.0.9 ▾
|
namara-er
/
er_service_twirp.py
|
|---|
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy. DO NOT EDIT!
# source: rpc/er_service.proto
from google.protobuf import symbol_database as _symbol_database
from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient
_sym_db = _symbol_database.Default()
class ErServiceServer(TwirpServer):
def __init__(self, *args, service, server_path_prefix="/twirp"):
super().__init__(service=service)
self._prefix = F"{server_path_prefix}/er_service.ErService"
self._endpoints = {
"CreateMatchGroup": Endpoint(
service_name="ErService",
name="CreateMatchGroup",
function=getattr(service, "CreateMatchGroup"),
input=_sym_db.GetSymbol("er_service.CreateMatchGroupRequest"),
output=_sym_db.GetSymbol("er_service.CreateMatchGroupResponse"),
),
"GetEntities": Endpoint(
service_name="ErService",
name="GetEntities",
function=getattr(service, "GetEntities"),
input=_sym_db.GetSymbol("er_service.GetEntitiesRequest"),
output=_sym_db.GetSymbol("er_service.GetEntitiesResponse"),
),
"LinkEntities": Endpoint(
service_name="ErService",
name="LinkEntities",
function=getattr(service, "LinkEntities"),
input=_sym_db.GetSymbol("er_service.LinkEntitiesRequest"),
output=_sym_db.GetSymbol("er_service.LinkEntitiesResponse"),
),
"Healthz": Endpoint(
service_name="ErService",
name="Healthz",
function=getattr(service, "Healthz"),
input=_sym_db.GetSymbol("er_service.HealthzRequest"),
output=_sym_db.GetSymbol("er_service.GetEntitiesResponse"),
),
"CreateBatchJob": Endpoint(
service_name="ErService",
name="CreateBatchJob",
function=getattr(service, "CreateBatchJob"),
input=_sym_db.GetSymbol("er_service.GetEntitiesRequest"),
output=_sym_db.GetSymbol("er_service.CreateBatchJobResponse"),
),
"GetBatchJob": Endpoint(
service_name="ErService",
name="GetBatchJob",
function=getattr(service, "GetBatchJob"),
input=_sym_db.GetSymbol("er_service.GetBatchJobRequest"),
output=_sym_db.GetSymbol("er_service.GetBatchJobResponse"),
),
}
class ErServiceClient(TwirpClient):
def CreateMatchGroup(self, *args, ctx, request, server_path_prefix="/twirp", **kwargs):
return self._make_request(
url=F"{server_path_prefix}/er_service.ErService/CreateMatchGroup",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("er_service.CreateMatchGroupResponse"),
**kwargs,
)
def GetEntities(self, *args, ctx, request, server_path_prefix="/twirp", **kwargs):
return self._make_request(
url=F"{server_path_prefix}/er_service.ErService/GetEntities",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("er_service.GetEntitiesResponse"),
**kwargs,
)
def LinkEntities(self, *args, ctx, request, server_path_prefix="/twirp", **kwargs):
return self._make_request(
url=F"{server_path_prefix}/er_service.ErService/LinkEntities",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("er_service.LinkEntitiesResponse"),
**kwargs,
)
def Healthz(self, *args, ctx, request, server_path_prefix="/twirp", **kwargs):
return self._make_request(
url=F"{server_path_prefix}/er_service.ErService/Healthz",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("er_service.GetEntitiesResponse"),
**kwargs,
)
def CreateBatchJob(self, *args, ctx, request, server_path_prefix="/twirp", **kwargs):
return self._make_request(
url=F"{server_path_prefix}/er_service.ErService/CreateBatchJob",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("er_service.CreateBatchJobResponse"),
**kwargs,
)
def GetBatchJob(self, *args, ctx, request, server_path_prefix="/twirp", **kwargs):
return self._make_request(
url=F"{server_path_prefix}/er_service.ErService/GetBatchJob",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("er_service.GetBatchJobResponse"),
**kwargs,
)