Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara-python / rpc / category_twirp.py
Size: Mime:
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy.  DO NOT EDIT!
# source: category.proto

from google.protobuf import symbol_database as _symbol_database

from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient

_sym_db = _symbol_database.Default()

class CategoryServiceServer(TwirpServer):

	def __init__(self, *args, service):
		super().__init__(service=service)
		self._prefix = "/twirp/category.CategoryService"
		self._endpoints = {
			"CreateTopic": Endpoint(
				service_name="CategoryService",
				name="CreateTopic",
				function=getattr(service, "CreateTopic"),
				input=_sym_db.GetSymbol("category.CreateTopicRequest"),
				output=_sym_db.GetSymbol("category.CreateTopicResponse"),
			),
			"GetTopic": Endpoint(
				service_name="CategoryService",
				name="GetTopic",
				function=getattr(service, "GetTopic"),
				input=_sym_db.GetSymbol("category.GetTopicRequest"),
				output=_sym_db.GetSymbol("category.GetTopicResponse"),
			),
			"UpdateTopic": Endpoint(
				service_name="CategoryService",
				name="UpdateTopic",
				function=getattr(service, "UpdateTopic"),
				input=_sym_db.GetSymbol("category.UpdateTopicRequest"),
				output=_sym_db.GetSymbol("category.UpdateTopicResponse"),
			),
			"SetTopic": Endpoint(
				service_name="CategoryService",
				name="SetTopic",
				function=getattr(service, "SetTopic"),
				input=_sym_db.GetSymbol("category.SetTopicRequest"),
				output=_sym_db.GetSymbol("category.SetTopicResponse"),
			),
			"UnsetTopic": Endpoint(
				service_name="CategoryService",
				name="UnsetTopic",
				function=getattr(service, "UnsetTopic"),
				input=_sym_db.GetSymbol("category.UnsetTopicRequest"),
				output=_sym_db.GetSymbol("category.UnsetTopicResponse"),
			),
			"DeleteTopic": Endpoint(
				service_name="CategoryService",
				name="DeleteTopic",
				function=getattr(service, "DeleteTopic"),
				input=_sym_db.GetSymbol("category.DeleteTopicRequest"),
				output=_sym_db.GetSymbol("category.DeleteTopicResponse"),
			),
			"GetTopics": Endpoint(
				service_name="CategoryService",
				name="GetTopics",
				function=getattr(service, "GetTopics"),
				input=_sym_db.GetSymbol("category.GetTopicsRequest"),
				output=_sym_db.GetSymbol("category.GetTopicsResponse"),
			),
			"ListTopics": Endpoint(
				service_name="CategoryService",
				name="ListTopics",
				function=getattr(service, "ListTopics"),
				input=_sym_db.GetSymbol("category.ListTopicsRequest"),
				output=_sym_db.GetSymbol("category.ListTopicsResponse"),
			),
			"ListDatasets": Endpoint(
				service_name="CategoryService",
				name="ListDatasets",
				function=getattr(service, "ListDatasets"),
				input=_sym_db.GetSymbol("category.ListDatasetsRequest"),
				output=_sym_db.GetSymbol("category.ListDatasetsResponse"),
			),
			"GetClassifications": Endpoint(
				service_name="CategoryService",
				name="GetClassifications",
				function=getattr(service, "GetClassifications"),
				input=_sym_db.GetSymbol("category.GetClassificationsRequest"),
				output=_sym_db.GetSymbol("category.GetClassificationsResponse"),
			),
			"Healthz": Endpoint(
				service_name="CategoryService",
				name="Healthz",
				function=getattr(service, "Healthz"),
				input=_sym_db.GetSymbol("category.Empty"),
				output=_sym_db.GetSymbol("category.Empty"),
			),
		}

class CategoryServiceClient(TwirpClient):

	def CreateTopic(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/CreateTopic",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.CreateTopicResponse"),
			**kwargs,
		)

	def GetTopic(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/GetTopic",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.GetTopicResponse"),
			**kwargs,
		)

	def UpdateTopic(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/UpdateTopic",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.UpdateTopicResponse"),
			**kwargs,
		)

	def SetTopic(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/SetTopic",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.SetTopicResponse"),
			**kwargs,
		)

	def UnsetTopic(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/UnsetTopic",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.UnsetTopicResponse"),
			**kwargs,
		)

	def DeleteTopic(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/DeleteTopic",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.DeleteTopicResponse"),
			**kwargs,
		)

	def GetTopics(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/GetTopics",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.GetTopicsResponse"),
			**kwargs,
		)

	def ListTopics(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/ListTopics",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.ListTopicsResponse"),
			**kwargs,
		)

	def ListDatasets(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/ListDatasets",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.ListDatasetsResponse"),
			**kwargs,
		)

	def GetClassifications(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/GetClassifications",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.GetClassificationsResponse"),
			**kwargs,
		)

	def Healthz(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/category.CategoryService/Healthz",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("category.Empty"),
			**kwargs,
		)