Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara-python / namara_python / rpc / sources_twirp.py
Size: Mime:
# -*- coding: utf-8 -*-
# Generated by https://github.com/verloop/twirpy/protoc-gen-twirpy.  DO NOT EDIT!
# source: sources.proto

from google.protobuf import symbol_database as _symbol_database

from twirp.base import Endpoint
from twirp.server import TwirpServer
from twirp.client import TwirpClient

_sym_db = _symbol_database.Default()

class SourcesServiceServer(TwirpServer):

	def __init__(self, *args, service):
		super().__init__(service=service)
		self._prefix = "/twirp/sources.SourcesService"
		self._endpoints = {
			"CreateSource": Endpoint(
				service_name="SourcesService",
				name="CreateSource",
				function=getattr(service, "CreateSource"),
				input=_sym_db.GetSymbol("sources.CreateSourceRequest"),
				output=_sym_db.GetSymbol("sources.CreateSourceResponse"),
			),
			"ListSources": Endpoint(
				service_name="SourcesService",
				name="ListSources",
				function=getattr(service, "ListSources"),
				input=_sym_db.GetSymbol("sources.ListSourcesRequest"),
				output=_sym_db.GetSymbol("sources.ListSourcesResponse"),
			),
			"GetSource": Endpoint(
				service_name="SourcesService",
				name="GetSource",
				function=getattr(service, "GetSource"),
				input=_sym_db.GetSymbol("sources.GetSourceRequest"),
				output=_sym_db.GetSymbol("sources.GetSourceResponse"),
			),
			"UpdateSource": Endpoint(
				service_name="SourcesService",
				name="UpdateSource",
				function=getattr(service, "UpdateSource"),
				input=_sym_db.GetSymbol("sources.UpdateSourceRequest"),
				output=_sym_db.GetSymbol("sources.UpdateSourceResponse"),
			),
			"DeleteSource": Endpoint(
				service_name="SourcesService",
				name="DeleteSource",
				function=getattr(service, "DeleteSource"),
				input=_sym_db.GetSymbol("sources.DeleteSourceRequest"),
				output=_sym_db.GetSymbol("sources.DeleteSourceResponse"),
			),
			"GetImportLiteSignedUrl": Endpoint(
				service_name="SourcesService",
				name="GetImportLiteSignedUrl",
				function=getattr(service, "GetImportLiteSignedUrl"),
				input=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlRequest"),
				output=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse"),
			),
			"CreateCredential": Endpoint(
				service_name="SourcesService",
				name="CreateCredential",
				function=getattr(service, "CreateCredential"),
				input=_sym_db.GetSymbol("sources.CreateCredentialRequest"),
				output=_sym_db.GetSymbol("sources.CreateCredentialResponse"),
			),
			"ListCredentials": Endpoint(
				service_name="SourcesService",
				name="ListCredentials",
				function=getattr(service, "ListCredentials"),
				input=_sym_db.GetSymbol("sources.ListCredentialsRequest"),
				output=_sym_db.GetSymbol("sources.ListCredentialsResponse"),
			),
			"GetCredential": Endpoint(
				service_name="SourcesService",
				name="GetCredential",
				function=getattr(service, "GetCredential"),
				input=_sym_db.GetSymbol("sources.GetCredentialRequest"),
				output=_sym_db.GetSymbol("sources.GetCredentialResponse"),
			),
			"UpdateCredential": Endpoint(
				service_name="SourcesService",
				name="UpdateCredential",
				function=getattr(service, "UpdateCredential"),
				input=_sym_db.GetSymbol("sources.UpdateCredentialRequest"),
				output=_sym_db.GetSymbol("sources.UpdateCredentialResponse"),
			),
			"DeleteCredential": Endpoint(
				service_name="SourcesService",
				name="DeleteCredential",
				function=getattr(service, "DeleteCredential"),
				input=_sym_db.GetSymbol("sources.DeleteCredentialRequest"),
				output=_sym_db.GetSymbol("sources.DeleteCredentialResponse"),
			),
			"ListWarehouses": Endpoint(
				service_name="SourcesService",
				name="ListWarehouses",
				function=getattr(service, "ListWarehouses"),
				input=_sym_db.GetSymbol("sources.ListWarehousesRequest"),
				output=_sym_db.GetSymbol("sources.ListWarehousesResponse"),
			),
			"Healthz": Endpoint(
				service_name="SourcesService",
				name="Healthz",
				function=getattr(service, "Healthz"),
				input=_sym_db.GetSymbol("sources.Empty"),
				output=_sym_db.GetSymbol("sources.Empty"),
			),
		}

class SourcesServiceClient(TwirpClient):

	def CreateSource(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/CreateSource",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.CreateSourceResponse"),
			**kwargs,
		)

	def ListSources(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/ListSources",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.ListSourcesResponse"),
			**kwargs,
		)

	def GetSource(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/GetSource",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.GetSourceResponse"),
			**kwargs,
		)

	def UpdateSource(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/UpdateSource",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.UpdateSourceResponse"),
			**kwargs,
		)

	def DeleteSource(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/DeleteSource",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.DeleteSourceResponse"),
			**kwargs,
		)

	def GetImportLiteSignedUrl(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/GetImportLiteSignedUrl",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.GetImportLiteSignedUrlResponse"),
			**kwargs,
		)

	def CreateCredential(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/CreateCredential",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.CreateCredentialResponse"),
			**kwargs,
		)

	def ListCredentials(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/ListCredentials",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.ListCredentialsResponse"),
			**kwargs,
		)

	def GetCredential(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/GetCredential",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.GetCredentialResponse"),
			**kwargs,
		)

	def UpdateCredential(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/UpdateCredential",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.UpdateCredentialResponse"),
			**kwargs,
		)

	def DeleteCredential(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/DeleteCredential",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.DeleteCredentialResponse"),
			**kwargs,
		)

	def ListWarehouses(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/ListWarehouses",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.ListWarehousesResponse"),
			**kwargs,
		)

	def Healthz(self, *args, ctx, request, **kwargs):
		return self._make_request(
			url="/twirp/sources.SourcesService/Healthz",
			ctx=ctx,
			request=request,
			response_obj=_sym_db.GetSymbol("sources.Empty"),
			**kwargs,
		)