Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara-python / rpc / dataspec_twirp.py
Size: Mime:
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy.  DO NOT EDIT!
# source: dataspec.proto

from google.protobuf import symbol_database as _symbol_database

from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient

_sym_db = _symbol_database.Default()

class DataspecServiceServer(TwirpServer):

	def __init__(self, *args, service):
		super().__init__(service=service)
		self._prefix = "/twirp/dataspec.DataspecService"
		self._endpoints = {
			"GenerateMetrics": Endpoint(
				service_name="DataspecService",
				name="GenerateMetrics",
				function=getattr(service, "GenerateMetrics"),
				input=_sym_db.GetSymbol("dataspec.GenerateMetricsRequest"),
				output=_sym_db.GetSymbol("dataspec.GenerateMetricsResponse"),
			),
			"ListMetrics": Endpoint(
				service_name="DataspecService",
				name="ListMetrics",
				function=getattr(service, "ListMetrics"),
				input=_sym_db.GetSymbol("dataspec.ListMetricsRequest"),
				output=_sym_db.GetSymbol("dataspec.ListMetricsResponse"),
			),
			"CreateAlert": Endpoint(
				service_name="DataspecService",
				name="CreateAlert",
				function=getattr(service, "CreateAlert"),
				input=_sym_db.GetSymbol("dataspec.CreateAlertRequest"),
				output=_sym_db.GetSymbol("dataspec.CreateAlertResponse"),
			),
			"ListAlerts": Endpoint(
				service_name="DataspecService",
				name="ListAlerts",
				function=getattr(service, "ListAlerts"),
				input=_sym_db.GetSymbol("dataspec.ListAlertsRequest"),
				output=_sym_db.GetSymbol("dataspec.ListAlertsResponse"),
			),
			"ListTriggeredAlerts": Endpoint(
				service_name="DataspecService",
				name="ListTriggeredAlerts",
				function=getattr(service, "ListTriggeredAlerts"),
				input=_sym_db.GetSymbol("dataspec.ListTriggeredAlertsRequest"),
				output=_sym_db.GetSymbol("dataspec.ListTriggeredAlertsResponse"),
			),
			"GetAlert": Endpoint(
				service_name="DataspecService",
				name="GetAlert",
				function=getattr(service, "GetAlert"),
				input=_sym_db.GetSymbol("dataspec.GetAlertRequest"),
				output=_sym_db.GetSymbol("dataspec.GetAlertResponse"),
			),
			"UpdateAlert": Endpoint(
				service_name="DataspecService",
				name="UpdateAlert",
				function=getattr(service, "UpdateAlert"),
				input=_sym_db.GetSymbol("dataspec.UpdateAlertRequest"),
				output=_sym_db.GetSymbol("dataspec.UpdateAlertResponse"),
			),
			"DeleteAlert": Endpoint(
				service_name="DataspecService",
				name="DeleteAlert",
				function=getattr(service, "DeleteAlert"),
				input=_sym_db.GetSymbol("dataspec.DeleteAlertRequest"),
				output=_sym_db.GetSymbol("dataspec.DeleteAlertResponse"),
			),
			"HealthCheck": Endpoint(
				service_name="DataspecService",
				name="HealthCheck",
				function=getattr(service, "HealthCheck"),
				input=_sym_db.GetSymbol("dataspec.Empty"),
				output=_sym_db.GetSymbol("dataspec.Empty"),
			),
		}

class DataspecServiceClient(TwirpClient):

	def GenerateMetrics(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/GenerateMetrics",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.GenerateMetricsResponse"),
			**kwargs,
		)

	def ListMetrics(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/ListMetrics",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.ListMetricsResponse"),
			**kwargs,
		)

	def CreateAlert(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/CreateAlert",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.CreateAlertResponse"),
			**kwargs,
		)

	def ListAlerts(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/ListAlerts",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.ListAlertsResponse"),
			**kwargs,
		)

	def ListTriggeredAlerts(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/ListTriggeredAlerts",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.ListTriggeredAlertsResponse"),
			**kwargs,
		)

	def GetAlert(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/GetAlert",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.GetAlertResponse"),
			**kwargs,
		)

	def UpdateAlert(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/UpdateAlert",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.UpdateAlertResponse"),
			**kwargs,
		)

	def DeleteAlert(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/DeleteAlert",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.DeleteAlertResponse"),
			**kwargs,
		)

	def HealthCheck(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/dataspec.DataspecService/HealthCheck",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("dataspec.Empty"),
			**kwargs,
		)