Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
bosdyn-client / client / spot_cam / compositor.py
Size: Mime:
# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""For clients to the Spot CAM Compositor service."""

from google.protobuf.wrappers_pb2 import BoolValue

from bosdyn.api.spot_cam import compositor_pb2, service_pb2_grpc
from bosdyn.client.common import BaseClient, handle_common_header_errors


class CompositorClient(BaseClient):
    """A client calling Spot CAM Compositor services.
    """
    default_service_name = 'spot-cam-compositor'
    service_type = 'bosdyn.api.spot_cam.CompositorService'

    def __init__(self):
        super(CompositorClient, self).__init__(service_pb2_grpc.CompositorServiceStub)

    def set_screen(self, name, **kwargs):
        """Change the current view that is being streamed over the network"""
        request = compositor_pb2.SetScreenRequest(name=name)
        return self.call(self._stub.SetScreen, request, self._name_from_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def set_screen_async(self, name, **kwargs):
        """Async version of set_screen()"""
        request = compositor_pb2.SetScreenRequest(name=name)
        return self.call_async(self._stub.SetScreen, request, self._name_from_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_screen(self, **kwargs):
        """Get the currently selected screen"""
        request = compositor_pb2.GetScreenRequest()
        return self.call(self._stub.GetScreen, request, self._name_from_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_screen_async(self, **kwargs):
        """Async version of get_screen()"""
        request = compositor_pb2.GetScreenRequest()
        return self.call_async(self._stub.GetScreen, request, self._name_from_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def list_screens(self, **kwargs):
        """List available screens"""
        request = compositor_pb2.ListScreensRequest()
        return self.call(self._stub.ListScreens, request, self._screens_from_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def list_screens_async(self, **kwargs):
        """Async version of list_screens()"""
        request = compositor_pb2.ListScreensRequest()
        return self.call_async(self._stub.ListScreens, request, self._screens_from_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_visible_cameras(self, **kwargs):
        """List cameras on Spot CAM"""
        request = compositor_pb2.GetVisibleCamerasRequest()
        return self.call(self._stub.GetVisibleCameras, request, self._streams_from_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_visible_cameras_async(self, **kwargs):
        """Async version of get_visible_cameras()"""
        request = compositor_pb2.GetVisibleCamerasRequest()
        return self.call_async(self._stub.GetVisibleCameras, request, self._streams_from_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def set_ir_colormap(self, colormap, min_temp, max_temp, auto_scale, **kwargs):
        """Set IR colormap to use on Spot CAM

        Args:
            colormap (bosdyn.api.spot_cam.compositor_pb2.IrColorMap.ColorMap): IR display colormap
            min_temp (Float): minimum temperature on the temperature scale
            max_temp (Float): maximum temperature on the temperature scale
            auto_scale (Boolean): Auto-scale the color map. This is the most human-understandable
                option. min_temp and max_temp are ignored if this is set to True
            kwargs: extra arguments for controlling RPC details.
        """
        scale = compositor_pb2.IrColorMap.ScalingPair(min=min_temp, max=max_temp)
        auto = BoolValue(value=auto_scale)
        ir_colormap = compositor_pb2.IrColorMap(colormap=colormap, scale=scale, auto_scale=auto)
        request = compositor_pb2.SetIrColormapRequest(map=ir_colormap)
        return self.call(self._stub.SetIrColormap, request, self._return_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def set_ir_colormap_async(self, colormap, min_temp, max_temp, auto_scale, **kwargs):
        """Async version of set_ir_colormap()"""
        scale = compositor_pb2.IrColorMap.ScalingPair(min=min_temp, max=max_temp)
        auto = BoolValue(value=auto_scale)
        ir_colormap = compositor_pb2.IrColorMap(colormap=colormap, scale=scale, auto_scale=auto)
        request = compositor_pb2.SetIrColormapRequest(map=ir_colormap)
        return self.call_async(self._stub.SetIrColormap, request, self._return_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_ir_colormap(self, **kwargs):
        """Get currently selected IR colormap on Spot CAM"""
        request = compositor_pb2.GetIrColormapRequest()
        return self.call(self._stub.GetIrColormap, request, self._colormap_from_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_ir_colormap_async(self, **kwargs):
        """Async version of get_ir_colormap()"""
        request = compositor_pb2.GetIrColormapRequest()
        return self.call_async(self._stub.GetIrColormap, request, self._colormap_from_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def set_ir_meter_overlay(self, x, y, enable, unit, **kwargs):
        """Set IR reticle position to use on Spot CAM IR

        Args:
            x (Float): (0,1) horizontal coordinate of reticle
            y (Float): (0,1) vertical coordinate of reticle
            enable (Boolean): Enable the reticle on the display
            unit (TempUnit): Temperature unit to display
            kwargs: extra arguments for controlling RPC details.
        """
        coords = compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y)
        # setting both coords and meter fields for backwards compatibility
        overlay = compositor_pb2.IrMeterOverlay(enable=enable, coords=coords, meter=[coords],
                                                unit=unit)
        request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
        return self.call(self._stub.SetIrMeterOverlay, request, self._return_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def set_ir_meter_overlay_async(self, x, y, enable, unit, **kwargs):
        """Async version of set_ir_meter_overlay()"""
        coords = compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y)
        # setting both coords and meter fields for backwards compatibility
        overlay = compositor_pb2.IrMeterOverlay(enable=enable, coords=coords, meter=[coords],
                                                unit=unit)
        request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
        return self.call_async(self._stub.SetIrMeterOverlay, request, self._return_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def set_multi_ir_meter_overlay(self, coords, enable, unit, **kwargs):
        """Set multiple IR reticle positions to use on Spot CAM IR

        Args:
            coords (List[Tuple(Float, Float)]): List of (x, y) reticle coordinates in range (0,1)
                e.g. [(0.1, 0.2), (0.2, 0.4), (0.7, 0.7)]
            enable (Boolean): Enable the reticles on the display
            unit (TempUnit): Temperature unit to display
            kwargs: extra arguments for controlling RPC details.
        """
        coords_proto = [
            compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y) for x, y in coords
        ]
        overlay = compositor_pb2.IrMeterOverlay(enable=enable, meter=coords_proto, unit=unit)
        request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
        return self.call(self._stub.SetIrMeterOverlay, request, self._return_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def set_multi_ir_meter_overlay_async(self, coords, enable, unit, **kwargs):
        """Async version of set_multi_ir_meter_overlay()"""
        coords_proto = [
            compositor_pb2.IrMeterOverlay.NormalizedCoordinates(x=x, y=y) for x, y in coords
        ]
        overlay = compositor_pb2.IrMeterOverlay(enable=enable, meter=coords_proto, unit=unit)
        request = compositor_pb2.SetIrMeterOverlayRequest(overlay=overlay)
        return self.call_async(self._stub.SetIrMeterOverlay, request, self._return_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_ir_meter_overlay(self, **kwargs):
        """Get current IR reticle positions"""
        request = compositor_pb2.GetIrMeterOverlayRequest()
        return self.call(self._stub.GetIrMeterOverlay, request, self._return_response,
                         self._compositor_error_from_response, copy_request=False, **kwargs)

    def get_ir_meter_overlay_async(self, **kwargs):
        """Async version of get_ir_meter_overlay()"""
        request = compositor_pb2.GetIrMeterOverlayRequest()
        return self.call_async(self._stub.GetIrMeterOverlay, request, self._return_response,
                               self._compositor_error_from_response, copy_request=False, **kwargs)

    @staticmethod
    def _return_response(response):
        return response

    @staticmethod
    def _name_from_response(response):
        return response.name

    @staticmethod
    def _screens_from_response(response):
        return response.screens

    @staticmethod
    def _streams_from_response(response):
        return response.streams

    @staticmethod
    def _colormap_from_response(response):
        return response.map

    @staticmethod
    @handle_common_header_errors
    def _compositor_error_from_response(response):  # pylint: disable=unused-argument
        return None