Repository URL to install this package:
|
Version:
1.0.24 ▾
|
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy. DO NOT EDIT!
# source: dataspec.proto
from google.protobuf import symbol_database as _symbol_database
from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient
_sym_db = _symbol_database.Default()
class DataspecServiceServer(TwirpServer):
def __init__(self, *args, service):
super().__init__(service=service)
self._prefix = "/twirp/dataspec.DataspecService"
self._endpoints = {
"GenerateMetrics": Endpoint(
service_name="DataspecService",
name="GenerateMetrics",
function=getattr(service, "GenerateMetrics"),
input=_sym_db.GetSymbol("dataspec.GenerateMetricsRequest"),
output=_sym_db.GetSymbol("dataspec.GenerateMetricsResponse"),
),
"ListMetrics": Endpoint(
service_name="DataspecService",
name="ListMetrics",
function=getattr(service, "ListMetrics"),
input=_sym_db.GetSymbol("dataspec.ListMetricsRequest"),
output=_sym_db.GetSymbol("dataspec.ListMetricsResponse"),
),
"CreateAlert": Endpoint(
service_name="DataspecService",
name="CreateAlert",
function=getattr(service, "CreateAlert"),
input=_sym_db.GetSymbol("dataspec.CreateAlertRequest"),
output=_sym_db.GetSymbol("dataspec.CreateAlertResponse"),
),
"ListAlerts": Endpoint(
service_name="DataspecService",
name="ListAlerts",
function=getattr(service, "ListAlerts"),
input=_sym_db.GetSymbol("dataspec.ListAlertsRequest"),
output=_sym_db.GetSymbol("dataspec.ListAlertsResponse"),
),
"ListTriggeredAlerts": Endpoint(
service_name="DataspecService",
name="ListTriggeredAlerts",
function=getattr(service, "ListTriggeredAlerts"),
input=_sym_db.GetSymbol("dataspec.ListTriggeredAlertsRequest"),
output=_sym_db.GetSymbol("dataspec.ListTriggeredAlertsResponse"),
),
"GetAlert": Endpoint(
service_name="DataspecService",
name="GetAlert",
function=getattr(service, "GetAlert"),
input=_sym_db.GetSymbol("dataspec.GetAlertRequest"),
output=_sym_db.GetSymbol("dataspec.GetAlertResponse"),
),
"UpdateAlert": Endpoint(
service_name="DataspecService",
name="UpdateAlert",
function=getattr(service, "UpdateAlert"),
input=_sym_db.GetSymbol("dataspec.UpdateAlertRequest"),
output=_sym_db.GetSymbol("dataspec.UpdateAlertResponse"),
),
"DeleteAlert": Endpoint(
service_name="DataspecService",
name="DeleteAlert",
function=getattr(service, "DeleteAlert"),
input=_sym_db.GetSymbol("dataspec.DeleteAlertRequest"),
output=_sym_db.GetSymbol("dataspec.DeleteAlertResponse"),
),
"HealthCheck": Endpoint(
service_name="DataspecService",
name="HealthCheck",
function=getattr(service, "HealthCheck"),
input=_sym_db.GetSymbol("dataspec.Empty"),
output=_sym_db.GetSymbol("dataspec.Empty"),
),
}
class DataspecServiceClient(TwirpClient):
def GenerateMetrics(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/GenerateMetrics",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.GenerateMetricsResponse"),
**kwargs,
)
def ListMetrics(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/ListMetrics",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.ListMetricsResponse"),
**kwargs,
)
def CreateAlert(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/CreateAlert",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.CreateAlertResponse"),
**kwargs,
)
def ListAlerts(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/ListAlerts",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.ListAlertsResponse"),
**kwargs,
)
def ListTriggeredAlerts(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/ListTriggeredAlerts",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.ListTriggeredAlertsResponse"),
**kwargs,
)
def GetAlert(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/GetAlert",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.GetAlertResponse"),
**kwargs,
)
def UpdateAlert(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/UpdateAlert",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.UpdateAlertResponse"),
**kwargs,
)
def DeleteAlert(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/DeleteAlert",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.DeleteAlertResponse"),
**kwargs,
)
def HealthCheck(self, *args, ctx, request, **kwargs):
return self._make_request(
url="/twirp/dataspec.DataspecService/HealthCheck",
ctx=ctx,
request=request,
response_obj=_sym_db.GetSymbol("dataspec.Empty"),
**kwargs,
)