Repository URL to install this package:
|
Version:
1.0.24 ▾
|
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy. DO NOT EDIT!
# source: dataspec/dataspec.proto
from google.protobuf import symbol_database as _symbol_database
from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient
_sym_db = _symbol_database.Default()
class DataspecV2ServiceServer(TwirpServer):
def __init__(self, *args, service):
super().__init__(service=service)
self._prefix = "/twirp/dataspec.DataspecV2Service"
self._endpoints = {
"ListMetrics": Endpoint(
service_name="DataspecV2Service",
name="ListMetrics",
function=getattr(service, "ListMetrics"),
input=_sym_db.GetSymbol("dataspec.ListMetricsRequest"),
output=_sym_db.GetSymbol("dataspec.ListMetricsResponse"),
),
"CreateRule": Endpoint(
service_name="DataspecV2Service",
name="CreateRule",
function=getattr(service, "CreateRule"),
input=_sym_db.GetSymbol("dataspec.CreateRuleRequest"),
output=_sym_db.GetSymbol("dataspec.CreateRuleResponse"),
),
"ListRules": Endpoint(
service_name="DataspecV2Service",
name="ListRules",
function=getattr(service, "ListRules"),
input=_sym_db.GetSymbol("dataspec.ListRulesRequest"),
output=_sym_db.GetSymbol("dataspec.ListRulesResponse"),
),
"ListAlerts": Endpoint(
service_name="DataspecV2Service",
name="ListAlerts",
function=getattr(service, "ListAlerts"),
input=_sym_db.GetSymbol("dataspec.ListAlertsRequest"),
output=_sym_db.GetSymbol("dataspec.ListAlertsResponse"),
),
"GetRule": Endpoint(
service_name="DataspecV2Service",
name="GetRule",
function=getattr(service, "GetRule"),
input=_sym_db.GetSymbol("dataspec.GetRuleRequest"),
output=_sym_db.GetSymbol("dataspec.GetRuleResponse"),
),
"UpdateRule": Endpoint(
service_name="DataspecV2Service",
name="UpdateRule",
function=getattr(service, "UpdateRule"),
input=_sym_db.GetSymbol("dataspec.UpdateRuleRequest"),
output=_sym_db.GetSymbol("dataspec.UpdateRuleResponse"),
),
"DeleteRule": Endpoint(
service_name="DataspecV2Service",
name="DeleteRule",
function=getattr(service, "DeleteRule"),
input=_sym_db.GetSymbol("dataspec.DeleteRuleRequest"),
output=_sym_db.GetSymbol("dataspec.DeleteRuleResponse"),
),
"HealthCheck": Endpoint(
service_name="DataspecV2Service",
name="HealthCheck",
function=getattr(service, "HealthCheck"),
input=_sym_db.GetSymbol("dataspec.Empty"),
output=_sym_db.GetSymbol("dataspec.Empty"),
),
}
class DataspecV2ServiceClient(TwirpClient):
def ListMetrics(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/ListMetrics",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.ListMetricsResponse"),
**kwargs,
)
def CreateRule(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/CreateRule",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.CreateRuleResponse"),
**kwargs,
)
def ListRules(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/ListRules",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.ListRulesResponse"),
**kwargs,
)
def ListAlerts(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/ListAlerts",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.ListAlertsResponse"),
**kwargs,
)
def GetRule(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/GetRule",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.GetRuleResponse"),
**kwargs,
)
def UpdateRule(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/UpdateRule",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.UpdateRuleResponse"),
**kwargs,
)
def DeleteRule(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/DeleteRule",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.DeleteRuleResponse"),
**kwargs,
)
def HealthCheck(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecV2Service/HealthCheck",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.Empty"),
**kwargs,
)