Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
habitat_sim / simulator.py
Size: Mime:
#!/usr/bin/env python3

# Copyright (c) Meta Platforms, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import time
from collections import OrderedDict
from collections.abc import MutableMapping
from os import path as osp
from typing import Any, Dict, List
from typing import MutableMapping as MutableMapping_T
from typing import Optional, Union, cast, overload

import attr
import magnum as mn
import numpy as np
from magnum import Vector3
from numpy import ndarray

try:
    import torch
    from torch import Tensor

    _HAS_TORCH = True
except ImportError:
    _HAS_TORCH = False

import habitat_sim.errors
from habitat_sim.agent.agent import Agent, AgentConfiguration, AgentState
from habitat_sim.bindings import cuda_enabled
from habitat_sim.logging import LoggingContext, logger
from habitat_sim.metadata import MetadataMediator
from habitat_sim.nav import GreedyGeodesicFollower, NavMeshSettings
from habitat_sim.sensor import SensorSpec, SensorType
from habitat_sim.sensors.noise_models import make_sensor_noise_model
from habitat_sim.sim import SimulatorBackend, SimulatorConfiguration
from habitat_sim.utils.common import quat_from_angle_axis

# TODO maybe clean up types with TypeVars
ObservationDict = Dict[str, Union[bool, np.ndarray, "Tensor"]]


@attr.s(auto_attribs=True, slots=True)
class Configuration:
    r"""Specifies how to configure the simulator.

    :property sim_cfg: The configuration of the backend of the simulator
    :property agents: A list of agent configurations
    :property metadata_mediator: (optional) The metadata mediator to build the simulator from.

    Ties together a backend config, `sim_cfg` and a list of agent
    configurations `agents`.
    """

    sim_cfg: SimulatorConfiguration
    agents: List[AgentConfiguration]
    # An existing Metadata Mediator can also be used to construct a SimulatorBackend
    metadata_mediator: Optional[MetadataMediator] = None
    enable_batch_renderer: bool = False


@attr.s(auto_attribs=True)
class Simulator(SimulatorBackend):
    r"""The core class of habitat-sim

    :property config: configuration for the simulator

    The simulator ties together the backend, the agent, controls functions,
    NavMesh collision checking/pathfinding, attribute template management,
    object manipulation, and physics simulation.
    """

    config: Configuration
    agents: List[Agent] = attr.ib(factory=list, init=False)
    _num_total_frames: int = attr.ib(default=0, init=False)
    _default_agent_id: int = attr.ib(default=0, init=False)
    __sensors: List[Dict[str, "Sensor"]] = attr.ib(factory=list, init=False)
    _initialized: bool = attr.ib(default=False, init=False)
    _previous_step_time: float = attr.ib(
        default=0.0, init=False
    )  # track the compute time of each step
    _async_draw_agent_ids: Optional[Union[int, List[int]]] = None
    __last_state: Dict[int, AgentState] = attr.ib(factory=dict, init=False)

    @staticmethod
    def _sanitize_config(config: Configuration) -> None:
        if len(config.agents) == 0:
            raise RuntimeError(
                "Config has not agents specified.  Must specify at least 1 agent"
            )

        config.sim_cfg.create_renderer = not config.enable_batch_renderer and any(
            len(cfg.sensor_specifications) > 0 for cfg in config.agents
        )
        config.sim_cfg.load_semantic_mesh |= any(
            (
                any(
                    sens_spec.sensor_type == SensorType.SEMANTIC
                    for sens_spec in cfg.sensor_specifications
                )
                for cfg in config.agents
            )
        )

        config.sim_cfg.requires_textures = any(
            (
                any(
                    sens_spec.sensor_type == SensorType.COLOR
                    for sens_spec in cfg.sensor_specifications
                )
                for cfg in config.agents
            )
        )

    def __attrs_post_init__(self) -> None:
        LoggingContext.reinitialize_from_env()
        self._sanitize_config(self.config)
        self.__set_from_config(self.config)

    def close(self, destroy: bool = True) -> None:
        r"""Close the simulator instance.

        :param destroy: Whether or not to force the OpenGL context to be
            destroyed if async rendering was used.  If async rendering wasn't used,
            this has no effect.
        """
        # NB: Python still calls __del__ (and thus)
        # closes even if __init__ fails. We don't
        # have anything to close if we aren't initialized so
        # we can just return.
        if not self._initialized:
            return

        if self.renderer is not None:
            self.renderer.acquire_gl_context()

        for agent_sensorsuite in self.__sensors:
            for sensor in agent_sensorsuite.values():
                sensor.close()
                del sensor

        self.__sensors = []

        for agent in self.agents:
            agent.close()
            del agent

        self.agents = []

        self.__last_state.clear()

        super().close(destroy)

    def __enter__(self) -> "Simulator":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close(destroy=True)

    def seed(self, new_seed: int) -> None:
        super().seed(new_seed)
        self.pathfinder.seed(new_seed)

    @overload
    def reset(self, agent_ids: List[int]) -> Dict[int, ObservationDict]:
        ...

    @overload
    def reset(self, agent_ids: Optional[int] = None) -> ObservationDict:
        ...

    def reset(
        self, agent_ids: Union[Optional[int], List[int]] = None
    ) -> Union[ObservationDict, Dict[int, ObservationDict],]:
        super().reset()
        for i in range(len(self.agents)):
            self.reset_agent(i)

        if agent_ids is None:
            agent_ids = [self._default_agent_id]
            return_single = True
        else:
            agent_ids = cast(List[int], agent_ids)
            return_single = False
        obs = self.get_sensor_observations(agent_ids=agent_ids)
        if return_single:
            return obs[agent_ids[0]]
        return obs

    def reset_agent(self, agent_id: int) -> None:
        agent = self.get_agent(agent_id)
        initial_agent_state = agent.initial_state
        if initial_agent_state is None:
            raise RuntimeError("reset called before agent was given an initial state")

        self.initialize_agent(agent_id, initial_agent_state)

    def _config_backend(self, config: Configuration) -> None:
        if not self._initialized:
            super().__init__(config.sim_cfg, config.metadata_mediator)
            self._initialized = True
        else:
            super().reconfigure(config.sim_cfg)

    def _config_agents(self, config: Configuration) -> None:
        self.agents = [
            Agent(self.get_active_scene_graph().get_root_node().create_child(), cfg)
            for cfg in config.agents
        ]

    def _config_pathfinder(self, config: Configuration) -> None:
        scene_basename = osp.basename(config.sim_cfg.scene_id)
        # "mesh.ply" is identified as a replica model, whose navmesh
        # is named as "mesh_semantic.navmesh" and is placed in the
        # subfolder called "habitat" (a level deeper than the "mesh.ply")
        if scene_basename == "mesh.ply":
            scene_dir = osp.dirname(config.sim_cfg.scene_id)
            navmesh_filename = osp.join(scene_dir, "habitat", "mesh_semantic.navmesh")
        else:
            navmesh_filename = osp.splitext(config.sim_cfg.scene_id)[0] + ".navmesh"

        if osp.exists(navmesh_filename) and not self.pathfinder.is_loaded:
            self.pathfinder.load_nav_mesh(navmesh_filename)
            logger.info(f"Loaded navmesh {navmesh_filename}")

        # NOTE: this recomputed NavMesh does not include STATIC objects.
        needed_settings = NavMeshSettings()
        default_agent_config = config.agents[config.sim_cfg.default_agent_id]
        needed_settings.agent_radius = default_agent_config.radius
        needed_settings.agent_height = default_agent_config.height
        if (
            # If we loaded a navmesh and we need one with different settings,
            # always try and recompute
            (
                self.pathfinder.is_loaded
                and self.pathfinder.nav_mesh_settings != needed_settings
            )
            # If we didn't load a navmesh, only try to recompute one if we can.
            # This allows for use cases where we just want to view a single
            # object or similar.
            or (
                not self.pathfinder.is_loaded
                and config.sim_cfg.scene_id.lower() != "none"
                and config.sim_cfg.create_renderer
            )
        ):
            logger.info(
                f"Recomputing navmesh for agent's height {default_agent_config.height} and radius"
                f" {default_agent_config.radius}."
            )
            self.recompute_navmesh(self.pathfinder, needed_settings)

        self.pathfinder.seed(config.sim_cfg.random_seed)

        if not self.pathfinder.is_loaded:
            logger.warning(
                "Could not find a navmesh nor could one be computed, "
                "no collision checking will be done"
            )

    def reconfigure(self, config: Configuration) -> None:
        self._sanitize_config(config)

        if self.config != config:
            self.__set_from_config(config)
            self.config = config

    def __set_from_config(self, config: Configuration) -> None:
        self._config_backend(config)
        self._config_agents(config)
        self._config_pathfinder(config)
        self.frustum_culling = config.sim_cfg.frustum_culling

        for i in range(len(self.agents)):
            self.agents[i].controls.move_filter_fn = self.step_filter

        self._default_agent_id = config.sim_cfg.default_agent_id

        self.__sensors: List[Dict[str, Sensor]] = [
            dict() for i in range(len(config.agents))
        ]
        self.__last_state = dict()
        for agent_id, agent_cfg in enumerate(config.agents):
            for spec in agent_cfg.sensor_specifications:
                self._update_simulator_sensors(spec.uuid, agent_id=agent_id)
            self.initialize_agent(agent_id)

    def _update_simulator_sensors(self, uuid: str, agent_id: int) -> None:
        self.__sensors[agent_id][uuid] = Sensor(
            sim=self, agent=self.get_agent(agent_id), sensor_id=uuid
        )

    def add_sensor(
        self, sensor_spec: SensorSpec, agent_id: Optional[int] = None
    ) -> None:
        if (
            (
                not self.config.sim_cfg.load_semantic_mesh
                and sensor_spec.sensor_type == SensorType.SEMANTIC
            )
            or (
                not self.config.sim_cfg.requires_textures
                and sensor_spec.sensor_type == SensorType.COLOR
            )
            or (
                not self.config.sim_cfg.create_renderer
                and sensor_spec.sensor_type == SensorType.DEPTH
            )
        ):
            sensor_type = sensor_spec.sensor_type
            raise ValueError(
                f"""Data for {sensor_type} sensor was not loaded during Simulator init.
                    Cannot dynamically add a {sensor_type} sensor unless one already exists.
                    """
            )
        if agent_id is None:
            agent_id = self._default_agent_id
        agent = self.get_agent(agent_id=agent_id)
        agent._add_sensor(sensor_spec)
        self._update_simulator_sensors(sensor_spec.uuid, agent_id=agent_id)

    def get_agent(self, agent_id: int) -> Agent:
        return self.agents[agent_id]

    def initialize_agent(
        self, agent_id: int, initial_state: Optional[AgentState] = None
    ) -> Agent:
        agent = self.get_agent(agent_id=agent_id)
        if initial_state is None:
            initial_state = AgentState()
            if self.pathfinder.is_loaded:
                initial_state.position = self.pathfinder.get_random_navigable_point()
                initial_state.rotation = quat_from_angle_axis(
                    self.random.uniform_float(0, 2.0 * np.pi), np.array([0, 1, 0])
                )

        agent.set_state(initial_state, is_initial=True)
        self.__last_state[agent_id] = agent.state
        return agent

    def start_async_render_and_step_physics(
        self, dt: float, agent_ids: Union[int, List[int]] = 0
    ):
        assert not self.config.enable_batch_renderer

        if self._async_draw_agent_ids is not None:
            raise RuntimeError(
                "start_async_render_and_step_physics was already called.  "
                "Call get_sensor_observations_async_finish before calling this again.  "
                "Use step_physics to step physics additional times."
            )

        self._async_draw_agent_ids = agent_ids
        if isinstance(agent_ids, int):
            agent_ids = [agent_ids]

        for agent_id in agent_ids:
            agent_sensorsuite = self.__sensors[agent_id]
            for sensor in agent_sensorsuite.values():
                sensor._draw_observation_async()

        self.renderer.start_draw_jobs()
        self.step_physics(dt)

    def start_async_render(self, agent_ids: Union[int, List[int]] = 0):
        assert not self.config.enable_batch_renderer

        if self._async_draw_agent_ids is not None:
            raise RuntimeError(
                "start_async_render_and_step_physics was already called.  "
                "Call get_sensor_observations_async_finish before calling this again.  "
                "Use step_physics to step physics additional times."
            )

        self._async_draw_agent_ids = agent_ids
        if isinstance(agent_ids, int):
            agent_ids = [agent_ids]

        for agent_id in agent_ids:
            agent_sensorsuite = self.__sensors[agent_id]
            for sensor in agent_sensorsuite.values():
                sensor._draw_observation_async()

        self.renderer.start_draw_jobs()

    def get_sensor_observations_async_finish(
        self,
    ) -> Union[
        Dict[str, Union[ndarray, "Tensor"]],
        Dict[int, Dict[str, Union[ndarray, "Tensor"]]],
    ]:
        assert not self.config.enable_batch_renderer

        if self._async_draw_agent_ids is None:
            raise RuntimeError(
                "get_sensor_observations_async_finish was called before calling start_async_render_and_step_physics."
            )

        agent_ids = self._async_draw_agent_ids
        self._async_draw_agent_ids = None
        if isinstance(agent_ids, int):
            agent_ids = [agent_ids]
            return_single = True
        else:
            return_single = False

        self.renderer.wait_draw_jobs()
        # As backport. All Dicts are ordered in Python >= 3.7
        observations: Dict[int, Dict[str, Union[ndarray, "Tensor"]]] = OrderedDict()
        for agent_id in agent_ids:
            agent_observations: Dict[str, Union[ndarray, "Tensor"]] = {}
            for sensor_uuid, sensor in self.__sensors[agent_id].items():
                agent_observations[sensor_uuid] = sensor._get_observation_async()

            observations[agent_id] = agent_observations
        if return_single:
            return next(iter(observations.values()))
        return observations

    @overload
    def get_sensor_observations(self, agent_ids: int = 0) -> ObservationDict:
        ...

    @overload
    def get_sensor_observations(
        self, agent_ids: List[int]
    ) -> Dict[int, ObservationDict]:
        ...

    def get_sensor_observations(
        self, agent_ids: Union[int, List[int]] = 0
    ) -> Union[ObservationDict, Dict[int, ObservationDict],]:
        if isinstance(agent_ids, int):
            agent_ids = [agent_ids]
            return_single = True
        else:
            return_single = False

        # As backport. All Dicts are ordered in Python >= 3.7.
        observations: Dict[int, ObservationDict] = OrderedDict()

        # Draw observations (for classic non-batched renderer).
        if not self.config.enable_batch_renderer:
            for agent_id in agent_ids:
                agent_sensorsuite = self.__sensors[agent_id]
                for _sensor_uuid, sensor in agent_sensorsuite.items():
                    sensor.draw_observation()
        else:
            # The batch renderer draws observations from external code.
            # Sensors are only used as data containers.
            pass

        # Get observations.
        for agent_id in agent_ids:
            agent_observations: ObservationDict = {}
            for sensor_uuid, sensor in self.__sensors[agent_id].items():
                agent_observations[sensor_uuid] = sensor.get_observation()
            observations[agent_id] = agent_observations

        if return_single:
            return next(iter(observations.values()))
        return observations

    @property
    def _default_agent(self) -> Agent:
        # TODO Deprecate and remove
        return self.get_agent(agent_id=self._default_agent_id)

    @property
    def _last_state(self) -> AgentState:
        # TODO Deprecate and remove
        return self.__last_state[self._default_agent_id]

    @_last_state.setter
    def _last_state(self, state: AgentState) -> None:
        # TODO Deprecate and remove
        self.__last_state[self._default_agent_id] = state

    @property
    def _sensors(self) -> Dict[str, "Sensor"]:
        # TODO Deprecate and remove
        return self.__sensors[self._default_agent_id]

    def last_state(self, agent_id: Optional[int] = None) -> AgentState:
        if agent_id is None:
            agent_id = self._default_agent_id
        return self.__last_state[agent_id]

    @overload
    def step(self, action: Union[str, int], dt: float = 1.0 / 60.0) -> ObservationDict:
        ...

    @overload
    def step(
        self, action: MutableMapping_T[int, Union[str, int]], dt: float = 1.0 / 60.0
    ) -> Dict[int, ObservationDict]:
        ...

    def step(
        self,
        action: Union[str, int, MutableMapping_T[int, Union[str, int]]],
        dt: float = 1.0 / 60.0,
    ) -> Union[ObservationDict, Dict[int, ObservationDict],]:
        self._num_total_frames += 1
        if isinstance(action, MutableMapping):
            return_single = False
        else:
            action = cast(Dict[int, Union[str, int]], {self._default_agent_id: action})
            return_single = True
        collided_dict: Dict[int, bool] = {}
        for agent_id, agent_act in action.items():
            agent = self.get_agent(agent_id)
            collided_dict[agent_id] = agent.act(agent_act)
            self.__last_state[agent_id] = agent.get_state()

        # step physics by dt
        step_start_Time = time.time()
        super().step_world(dt)
        self._previous_step_time = time.time() - step_start_Time

        multi_observations = self.get_sensor_observations(agent_ids=list(action.keys()))
        for agent_id, agent_observation in multi_observations.items():
            agent_observation["collided"] = collided_dict[agent_id]
        if return_single:
            return multi_observations[self._default_agent_id]
        return multi_observations

    def make_greedy_follower(
        self,
        agent_id: Optional[int] = None,
        goal_radius: float = None,
        *,
        stop_key: Optional[Any] = None,
        forward_key: Optional[Any] = None,
        left_key: Optional[Any] = None,
        right_key: Optional[Any] = None,
        fix_thrashing: bool = True,
        thrashing_threshold: int = 16,
    ):
        if agent_id is None:
            agent_id = self._default_agent_id
        return GreedyGeodesicFollower(
            self.pathfinder,
            self.get_agent(agent_id),
            goal_radius,
            stop_key=stop_key,
            forward_key=forward_key,
            left_key=left_key,
            right_key=right_key,
            fix_thrashing=fix_thrashing,
            thrashing_threshold=thrashing_threshold,
        )

    def step_filter(self, start_pos: Vector3, end_pos: Vector3) -> Vector3:
        r"""Computes a valid navigable end point given a target translation on the NavMesh.
        Uses the configured sliding flag.

        :param start_pos: The valid initial position of a translation.
        :param end_pos: The target end position of a translation.
        """
        if self.pathfinder.is_loaded:
            if self.config.sim_cfg.allow_sliding:
                end_pos = self.pathfinder.try_step(start_pos, end_pos)
            else:
                end_pos = self.pathfinder.try_step_no_sliding(start_pos, end_pos)

        return end_pos

    def __del__(self) -> None:
        self.close(destroy=True)

    def step_physics(self, dt: float, scene_id: int = 0) -> None:
        self.step_world(dt)


class Sensor:
    r"""Wrapper around habitat_sim.Sensor

    TODO(MS) define entire Sensor class in python, reducing complexity
    """
    buffer = Union[np.ndarray, "Tensor"]

    def __init__(self, sim: Simulator, agent: Agent, sensor_id: str) -> None:
        self._sim = sim
        self._agent = agent

        # sensor is an attached object to the scene node
        # store such "attached object" in _sensor_object
        self._sensor_object = self._agent._sensors[sensor_id]

        self._spec = self._sensor_object.specification()

        # When using the batch renderer, no memory is allocated here.
        if not self._sim.config.enable_batch_renderer:
            self._initialize_sensor()

    def _initialize_sensor(self):
        r"""
        Allocate buffers and initialize noise model in preparation for rendering.
        """
        if self._spec.sensor_type == SensorType.AUDIO:
            return

        if self._sim.renderer is not None:
            self._sim.renderer.bind_render_target(self._sensor_object)

        if self._spec.gpu2gpu_transfer:
            assert cuda_enabled, "Must build habitat sim with cuda for gpu2gpu-transfer"
            assert _HAS_TORCH
            device = torch.device("cuda", self._sim.gpu_device)  # type: ignore[attr-defined]
            torch.cuda.set_device(device)

            resolution = self._spec.resolution
            if self._spec.sensor_type == SensorType.SEMANTIC:
                self._buffer: Union[np.ndarray, "Tensor"] = torch.empty(
                    resolution[0], resolution[1], dtype=torch.int32, device=device
                )
            elif self._spec.sensor_type == SensorType.DEPTH:
                self._buffer = torch.empty(
                    resolution[0], resolution[1], dtype=torch.float32, device=device
                )
            else:
                self._buffer = torch.empty(
                    resolution[0], resolution[1], 4, dtype=torch.uint8, device=device
                )
        else:
            size = self._sensor_object.framebuffer_size
            if self._spec.sensor_type == SensorType.SEMANTIC:
                self._buffer = np.empty(
                    (self._spec.resolution[0], self._spec.resolution[1]),
                    dtype=np.uint32,
                )
                self.view = mn.MutableImageView2D(
                    mn.PixelFormat.R32UI, size, self._buffer
                )
            elif self._spec.sensor_type == SensorType.DEPTH:
                self._buffer = np.empty(
                    (self._spec.resolution[0], self._spec.resolution[1]),
                    dtype=np.float32,
                )
                self.view = mn.MutableImageView2D(
                    mn.PixelFormat.R32F, size, self._buffer
                )
            else:
                self._buffer = np.empty(
                    (
                        self._spec.resolution[0],
                        self._spec.resolution[1],
                        self._spec.channels,
                    ),
                    dtype=np.uint8,
                )
                self.view = mn.MutableImageView2D(
                    mn.PixelFormat.RGBA8_UNORM,
                    size,
                    self._buffer.reshape(self._spec.resolution[0], -1),
                )

        noise_model_kwargs = self._spec.noise_model_kwargs
        self._noise_model = make_sensor_noise_model(
            self._spec.noise_model,
            {"gpu_device_id": self._sim.gpu_device, **noise_model_kwargs},
        )
        assert self._noise_model.is_valid_sensor_type(
            self._spec.sensor_type
        ), "Noise model '{}' is not valid for sensor '{}'".format(
            self._spec.noise_model, self._spec.uuid
        )

    def draw_observation(self) -> None:
        # Batch rendering happens elsewhere.
        assert not self._sim.config.enable_batch_renderer

        if self._spec.sensor_type == SensorType.AUDIO:
            # do nothing in draw observation, get_observation will be called after this
            # run the simulation there
            return

        assert self._sim.renderer is not None
        # see if the sensor is attached to a scene graph, otherwise it is invalid,
        # and cannot make any observation
        if not self._sensor_object.object:
            raise habitat_sim.errors.InvalidAttachedObject(
                "Sensor observation requested but sensor is invalid.\
                    (has it been detached from a scene node?)"
            )
        self._sim.renderer.draw(self._sensor_object, self._sim)

    def _draw_observation_async(self) -> None:
        # Batch rendering happens elsewhere.
        assert not self._sim.config.enable_batch_renderer

        if self._spec.sensor_type == SensorType.AUDIO:
            # do nothing in draw observation, get_observation will be called after this
            # run the simulation there
            return

        assert self._sim.renderer is not None
        if (
            self._spec.sensor_type == SensorType.SEMANTIC
            and self._sim.get_active_scene_graph()
            is not self._sim.get_active_semantic_scene_graph()
        ):
            raise RuntimeError(
                "Async drawing doesn't support semantic rendering when there are multiple scene graphs"
            )
        # TODO: sync this path with renderer changes as above (render from sensor object)

        # see if the sensor is attached to a scene graph, otherwise it is invalid,
        # and cannot make any observation
        if not self._sensor_object.object:
            raise habitat_sim.errors.InvalidAttachedObject(
                "Sensor observation requested but sensor is invalid.\
                (has it been detached from a scene node?)"
            )

        # get the correct scene graph based on application
        if self._spec.sensor_type == SensorType.SEMANTIC:
            if self._sim.semantic_scene is None:
                raise RuntimeError(
                    "SemanticSensor observation requested but no SemanticScene is loaded"
                )
            scene = self._sim.get_active_semantic_scene_graph()
        else:  # SensorType is DEPTH or any other type
            scene = self._sim.get_active_scene_graph()

        # now, connect the agent to the root node of the current scene graph

        # sanity check is not needed on agent:
        # because if a sensor is attached to a scene graph,
        # it implies the agent is attached to the same scene graph
        # (it assumes backend simulator will guarantee it.)

        agent_node = self._agent.scene_node
        agent_node.parent = scene.get_root_node()

        # get the correct scene graph based on application
        if self._spec.sensor_type == SensorType.SEMANTIC:
            scene = self._sim.get_active_semantic_scene_graph()
        else:  # SensorType is DEPTH or any other type
            scene = self._sim.get_active_scene_graph()

        render_flags = habitat_sim.gfx.Camera.Flags.NONE

        if self._sim.frustum_culling:
            render_flags |= habitat_sim.gfx.Camera.Flags.FRUSTUM_CULLING

        self._sim.renderer.enqueue_async_draw_job(
            self._sensor_object, scene, self.view, render_flags
        )

    def get_observation(self) -> Union[ndarray, "Tensor"]:
        if self._spec.sensor_type == SensorType.AUDIO:
            return self._get_audio_observation()

        # Placeholder until batch renderer emplaces the final value.
        if self._sim.config.enable_batch_renderer:
            return None

        assert self._sim.renderer is not None
        tgt = self._sensor_object.render_target

        if self._spec.gpu2gpu_transfer:
            with torch.cuda.device(self._buffer.device):  # type: ignore[attr-defined, union-attr]
                if self._spec.sensor_type == SensorType.SEMANTIC:
                    tgt.read_frame_object_id_gpu(self._buffer.data_ptr())  # type: ignore[attr-defined, union-attr]
                elif self._spec.sensor_type == SensorType.DEPTH:
                    tgt.read_frame_depth_gpu(self._buffer.data_ptr())  # type: ignore[attr-defined, union-attr]
                else:
                    tgt.read_frame_rgba_gpu(self._buffer.data_ptr())  # type: ignore[attr-defined, union-attr]

                obs = self._buffer.flip(0)  # type: ignore[union-attr]
        else:
            if self._spec.sensor_type == SensorType.SEMANTIC:
                tgt.read_frame_object_id(self.view)
            elif self._spec.sensor_type == SensorType.DEPTH:
                tgt.read_frame_depth(self.view)
            else:
                tgt.read_frame_rgba(self.view)

            obs = np.flip(self._buffer, axis=0)

        return self._noise_model(obs)

    def _get_observation_async(self) -> Union[ndarray, "Tensor"]:
        if self._spec.sensor_type == SensorType.AUDIO:
            return self._get_audio_observation()
        if self._spec.gpu2gpu_transfer:
            obs = self._buffer.flip(0)  # type: ignore[union-attr]
        else:
            obs = np.flip(self._buffer, axis=0)

        return self._noise_model(obs)

    def _get_audio_observation(self) -> Union[ndarray, "Tensor"]:
        assert self._spec.sensor_type == SensorType.AUDIO
        audio_sensor = self._agent._sensors["audio_sensor"]
        # tell the audio sensor about the agent location
        rot = self._agent.state.rotation

        audio_sensor.setAudioListenerTransform(
            audio_sensor.node.absolute_translation,  # set the listener position
            np.array([rot.w, rot.x, rot.y, rot.z]),  # set the listener orientation
        )

        # run the simulation
        audio_sensor.runSimulation(self._sim)
        obs = audio_sensor.getIR()
        return obs

    def close(self) -> None:
        self._sim = None
        self._agent = None
        self._sensor_object = None