diff --git a/omnigibson/envs/data_wrapper.py b/omnigibson/envs/data_wrapper.py index 30326a463..0202e7877 100644 --- a/omnigibson/envs/data_wrapper.py +++ b/omnigibson/envs/data_wrapper.py @@ -5,6 +5,7 @@ from pathlib import Path import h5py +import imageio import torch as th import omnigibson as og @@ -86,14 +87,28 @@ def step(self, action): next_obs, reward, terminated, truncated, info = self.env.step(action) self.step_count += 1 + self._record_step_trajectory(action, next_obs, reward, terminated, truncated, info) + + return next_obs, reward, terminated, truncated, info + + def _record_step_trajectory(self, action, obs, reward, terminated, truncated, info): + """ + Record the current step data to the trajectory history + + Args: + action (th.Tensor): action deployed resulting in @obs + obs (dict): state, i.e. observation + reward (float): reward, i.e. reward at this current timestep + terminated (bool): terminated, i.e. whether this episode ended due to a failure or success + truncated (bool): truncated, i.e. whether this episode ended due to a time limit etc. + info (dict): info, i.e. dictionary with any useful information + """ # Aggregate step data - step_data = self._parse_step_data(action, next_obs, reward, terminated, truncated, info) + step_data = self._parse_step_data(action, obs, reward, terminated, truncated, info) # Update obs and traj history self.current_traj_history.append(step_data) - self.current_obs = next_obs - - return next_obs, reward, terminated, truncated, info + self.current_obs = obs def _parse_step_data(self, action, obs, reward, terminated, truncated, info): """ @@ -188,13 +203,22 @@ def process_traj_to_hdf5(self, traj_data, traj_grp_name, nested_keys=("obs",)): return traj_grp + @property + def should_save_current_episode(self): + """ + Returns: + bool: Whether the current episode should be saved or discarded + """ + # Only save successful demos and if actually recording + success = self.env.task.success or not self.only_successes + return success and self.hdf5_file is not None + def flush_current_traj(self): """ Flush current trajectory data """ # Only save successful demos and if actually recording - success = self.env.task.success or not self.only_successes - if success and self.hdf5_file is not None: + if self.should_save_current_episode: traj_grp_name = f"demo_{self.traj_count}" self.process_traj_to_hdf5(self.current_traj_history, traj_grp_name, nested_keys=["obs"]) self.traj_count += 1 @@ -251,7 +275,9 @@ class DataCollectionWrapper(DataWrapper): dataset! """ - def __init__(self, env, output_path, viewport_camera_path="/World/viewer_camera", only_successes=True): + def __init__( + self, env, output_path, viewport_camera_path="/World/viewer_camera", only_successes=True, use_vr=False + ): """ Args: env (Environment): The environment to wrap @@ -259,6 +285,7 @@ def __init__(self, env, output_path, viewport_camera_path="/World/viewer_camera" viewport_camera_path (str): prim path to the camera to use when rendering the main viewport during data collection only_successes (bool): Whether to only save successful episodes + use_vr (bool): Whether to use VR headset for data collection """ # Store additional variables needed for optimized data collection @@ -269,6 +296,9 @@ def __init__(self, env, output_path, viewport_camera_path="/World/viewer_camera" # the given simulator step. See add_transition_info() for more info self.current_transitions = dict() + self._is_recording = True + self.use_vr = use_vr + # Add callbacks on import / remove objects and systems og.sim.add_callback_on_system_init( name="data_collection", callback=lambda system: self.add_transition_info(obj=system, add=True) @@ -289,6 +319,18 @@ def __init__(self, env, output_path, viewport_camera_path="/World/viewer_camera" # Configure the simulator to optimize for data collection self._optimize_sim_for_data_collection(viewport_camera_path=viewport_camera_path) + @property + def is_recording(self): + return self._is_recording + + @is_recording.setter + def is_recording(self, value: bool): + self._is_recording = value + + def _record_step_trajectory(self, action, obs, reward, terminated, truncated, info): + if self.is_recording: + super()._record_step_trajectory(action, obs, reward, terminated, truncated, info) + def _optimize_sim_for_data_collection(self, viewport_camera_path): """ Configures the simulator to optimize for data collection @@ -309,12 +351,14 @@ def _optimize_sim_for_data_collection(self, viewport_camera_path): # toggling these settings to be True -> False -> True # Only setting it to True once will actually freeze the GUI for some reason! if not gm.HEADLESS: - lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True) - lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True) - lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", False) - lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", False) - lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True) - lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True) + # Async rendering does not work in VR mode + if not self.use_vr: + lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True) + lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True) + lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", False) + lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", False) + lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True) + lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True) # Disable mouse grabbing since we're only using the UI passively lazy.carb.settings.get_settings().set_bool("/physics/mouseInteractionEnabled", False) @@ -383,6 +427,11 @@ def flush_current_traj(self): self.max_state_size = 0 self.current_transitions = dict() + @property + def should_save_current_episode(self): + # In addition to default conditions, we only save the current episode if we are actually recording + return super().should_save_current_episode and self.is_recording + def add_transition_info(self, obj, add=True): """ Adds transition info to the current sim step for specific object @obj. @@ -469,6 +518,10 @@ def create_from_hdf5( if config["task"]["type"] == "BehaviorTask": config["task"]["online_object_sampling"] = False + # Because we're loading directly from the cached scene file, we need to disable any additional objects that are being added since + # they will already be cached in the original scene file + config["objects"] = [] + # Set observation modalities and update sensor config for robot_cfg in config["robots"]: robot_cfg["obs_modalities"] = robot_obs_modalities @@ -522,7 +575,7 @@ def _parse_step_data(self, action, obs, reward, terminated, truncated, info): step_data["truncated"] = truncated return step_data - def playback_episode(self, episode_id, record=True): + def playback_episode(self, episode_id, record=True, video_path=None, video_writer=None): """ Playback episode @episode_id, and optionally record observation data if @record is True @@ -530,7 +583,13 @@ def playback_episode(self, episode_id, record=True): episode_id (int): Episode to playback. This should be a valid demo ID number from the inputted collected data hdf5 file record (bool): Whether to record data during playback or not + video_path (None or str): If specified, path to write the playback video to + video_writer (None or str): If specified, an imageio video writer to use for writing the video (can be specified in place of @video_path) """ + using_external_writer = video_writer is not None + if video_writer is None and video_path is not None: + video_writer = imageio.get_writer(video_path, fps=30) + data_grp = self.input_hdf5["data"] assert f"demo_{episode_id}" in data_grp, f"No valid episode with ID {episode_id} found!" traj_grp = data_grp[f"demo_{episode_id}"] @@ -596,17 +655,33 @@ def playback_episode(self, episode_id, record=True): ) self.current_traj_history.append(step_data) + # If writing video, save the current frame + if video_writer is not None: + video_writer.append_data(og.sim.viewer_camera.get_obs()[0]["rgb"][:, :, :3].numpy()) + self.step_count += 1 if record: self.flush_current_traj() - def playback_dataset(self, record=True): + # If we weren't using an external writer but we're still writing a video, close the writer + if video_writer is not None and not using_external_writer: + video_writer.close() + + def playback_dataset(self, record=True, video_path=None, video_writer=None): """ Playback all episodes from the input HDF5 file, and optionally record observation data if @record is True Args: record (bool): Whether to record data during playback or not + video_path (None or str): If specified, path to write the playback video to + video_writer (None or str): If specified, an imageio video writer to use for writing the video (can be specified in place of @video_path) """ + if video_writer is None and video_path is not None: + video_writer = imageio.get_writer(video_path, fps=30) for episode_id in range(self.input_hdf5["data"].attrs["n_episodes"]): - self.playback_episode(episode_id=episode_id, record=record) + self.playback_episode(episode_id=episode_id, record=record, video_path=None, video_writer=video_writer) + + # Close the video writer at the end if created + if video_writer is not None: + video_writer.close()