Repository URL to install this package:
|
Version:
4.1.0 ▾
|
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from bosdyn.api import data_acquisition_pb2 as bosdyn_dot_api_dot_data__acquisition__pb2
class DataAcquisitionPluginServiceStub(object):
"""The DataAcquisitionPluginService is a gRPC service that a payload developer implements to
retrieve data from a sensor (or more generally perform some payload action) and optionally store
that data on the robot via the DataAcquisitionStore service.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.AcquirePluginData = channel.unary_unary(
'/bosdyn.api.DataAcquisitionPluginService/AcquirePluginData',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquirePluginDataRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquirePluginDataResponse.FromString,
)
self.GetStatus = channel.unary_unary(
'/bosdyn.api.DataAcquisitionPluginService/GetStatus',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusResponse.FromString,
)
self.GetServiceInfo = channel.unary_unary(
'/bosdyn.api.DataAcquisitionPluginService/GetServiceInfo',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoResponse.FromString,
)
self.CancelAcquisition = channel.unary_unary(
'/bosdyn.api.DataAcquisitionPluginService/CancelAcquisition',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionResponse.FromString,
)
self.GetLiveData = channel.unary_unary(
'/bosdyn.api.DataAcquisitionPluginService/GetLiveData',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataResponse.FromString,
)
class DataAcquisitionPluginServiceServicer(object):
"""The DataAcquisitionPluginService is a gRPC service that a payload developer implements to
retrieve data from a sensor (or more generally perform some payload action) and optionally store
that data on the robot via the DataAcquisitionStore service.
"""
def AcquirePluginData(self, request, context):
"""Trigger a data acquisition to save metadata and non-image data to the data buffer.
Sent by the main Data Acquisition service as a result of a data acquisition request from the
tablet or a client.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetStatus(self, request, context):
"""Query the status of a data acquisition.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetServiceInfo(self, request, context):
"""Get information from a Data Acquisition service; lists acquisition capabilities.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CancelAcquisition(self, request, context):
"""Cancel an in-progress data acquisition.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetLiveData(self, request, context):
"""Request live data available from this plugin during teleoperation.
Please use the other RPCs for typical data acquisition.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_DataAcquisitionPluginServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'AcquirePluginData': grpc.unary_unary_rpc_method_handler(
servicer.AcquirePluginData,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquirePluginDataRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquirePluginDataResponse.SerializeToString,
),
'GetStatus': grpc.unary_unary_rpc_method_handler(
servicer.GetStatus,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusResponse.SerializeToString,
),
'GetServiceInfo': grpc.unary_unary_rpc_method_handler(
servicer.GetServiceInfo,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoResponse.SerializeToString,
),
'CancelAcquisition': grpc.unary_unary_rpc_method_handler(
servicer.CancelAcquisition,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionResponse.SerializeToString,
),
'GetLiveData': grpc.unary_unary_rpc_method_handler(
servicer.GetLiveData,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'bosdyn.api.DataAcquisitionPluginService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class DataAcquisitionPluginService(object):
"""The DataAcquisitionPluginService is a gRPC service that a payload developer implements to
retrieve data from a sensor (or more generally perform some payload action) and optionally store
that data on the robot via the DataAcquisitionStore service.
"""
@staticmethod
def AcquirePluginData(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionPluginService/AcquirePluginData',
bosdyn_dot_api_dot_data__acquisition__pb2.AcquirePluginDataRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.AcquirePluginDataResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetStatus(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionPluginService/GetStatus',
bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetServiceInfo(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionPluginService/GetServiceInfo',
bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def CancelAcquisition(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionPluginService/CancelAcquisition',
bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetLiveData(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionPluginService/GetLiveData',
bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)