Repository URL to install this package:
|
Version:
4.1.0 ▾
|
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from bosdyn.api import data_acquisition_pb2 as bosdyn_dot_api_dot_data__acquisition__pb2
class DataAcquisitionServiceStub(object):
"""The DataAcquisitionService is the main data acquisition service run on robot, which receives
incoming requests and sends queries to all directory-registered DataAcquisitionPluginServices.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.AcquireData = channel.unary_unary(
'/bosdyn.api.DataAcquisitionService/AcquireData',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquireDataRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquireDataResponse.FromString,
)
self.GetStatus = channel.unary_unary(
'/bosdyn.api.DataAcquisitionService/GetStatus',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusResponse.FromString,
)
self.GetServiceInfo = channel.unary_unary(
'/bosdyn.api.DataAcquisitionService/GetServiceInfo',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoResponse.FromString,
)
self.CancelAcquisition = channel.unary_unary(
'/bosdyn.api.DataAcquisitionService/CancelAcquisition',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionResponse.FromString,
)
self.GetLiveData = channel.unary_unary(
'/bosdyn.api.DataAcquisitionService/GetLiveData',
request_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataRequest.SerializeToString,
response_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataResponse.FromString,
)
class DataAcquisitionServiceServicer(object):
"""The DataAcquisitionService is the main data acquisition service run on robot, which receives
incoming requests and sends queries to all directory-registered DataAcquisitionPluginServices.
"""
def AcquireData(self, request, context):
"""Trigger a data acquisition to save data and metadata to the data buffer.
Sent by the tablet or a client to initiate a data acquisition and buffering process.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetStatus(self, request, context):
"""Query the status of a data acquisition.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetServiceInfo(self, request, context):
"""Get information from a Data Acquisition service; lists acquisition capabilities.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def CancelAcquisition(self, request, context):
"""Cancel an in-progress data acquisition.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetLiveData(self, request, context):
"""Request live data available from DAQ plugins during teleoperation.
Please use the other RPCs for typical data acquisition.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_DataAcquisitionServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'AcquireData': grpc.unary_unary_rpc_method_handler(
servicer.AcquireData,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquireDataRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.AcquireDataResponse.SerializeToString,
),
'GetStatus': grpc.unary_unary_rpc_method_handler(
servicer.GetStatus,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusResponse.SerializeToString,
),
'GetServiceInfo': grpc.unary_unary_rpc_method_handler(
servicer.GetServiceInfo,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoResponse.SerializeToString,
),
'CancelAcquisition': grpc.unary_unary_rpc_method_handler(
servicer.CancelAcquisition,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionResponse.SerializeToString,
),
'GetLiveData': grpc.unary_unary_rpc_method_handler(
servicer.GetLiveData,
request_deserializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataRequest.FromString,
response_serializer=bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'bosdyn.api.DataAcquisitionService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class DataAcquisitionService(object):
"""The DataAcquisitionService is the main data acquisition service run on robot, which receives
incoming requests and sends queries to all directory-registered DataAcquisitionPluginServices.
"""
@staticmethod
def AcquireData(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionService/AcquireData',
bosdyn_dot_api_dot_data__acquisition__pb2.AcquireDataRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.AcquireDataResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetStatus(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionService/GetStatus',
bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.GetStatusResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetServiceInfo(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionService/GetServiceInfo',
bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.GetServiceInfoResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def CancelAcquisition(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionService/CancelAcquisition',
bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.CancelAcquisitionResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetLiveData(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/bosdyn.api.DataAcquisitionService/GetLiveData',
bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataRequest.SerializeToString,
bosdyn_dot_api_dot_data__acquisition__pb2.LiveDataResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)