Skip to content

Commit

Permalink
Merge pull request #1061 from StanfordVL/feat/data-wrapper-vr
Browse files Browse the repository at this point in the history
VR support for Data Wrapper
  • Loading branch information
hang-yin authored Jan 27, 2025
2 parents 00d8193 + 0293a4b commit 6203d71
Showing 1 changed file with 91 additions and 16 deletions.
107 changes: 91 additions & 16 deletions omnigibson/envs/data_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

import h5py
import imageio
import torch as th

import omnigibson as og
Expand Down Expand Up @@ -86,14 +87,28 @@ def step(self, action):
next_obs, reward, terminated, truncated, info = self.env.step(action)
self.step_count += 1

self._record_step_trajectory(action, next_obs, reward, terminated, truncated, info)

return next_obs, reward, terminated, truncated, info

def _record_step_trajectory(self, action, obs, reward, terminated, truncated, info):
"""
Record the current step data to the trajectory history
Args:
action (th.Tensor): action deployed resulting in @obs
obs (dict): state, i.e. observation
reward (float): reward, i.e. reward at this current timestep
terminated (bool): terminated, i.e. whether this episode ended due to a failure or success
truncated (bool): truncated, i.e. whether this episode ended due to a time limit etc.
info (dict): info, i.e. dictionary with any useful information
"""
# Aggregate step data
step_data = self._parse_step_data(action, next_obs, reward, terminated, truncated, info)
step_data = self._parse_step_data(action, obs, reward, terminated, truncated, info)

# Update obs and traj history
self.current_traj_history.append(step_data)
self.current_obs = next_obs

return next_obs, reward, terminated, truncated, info
self.current_obs = obs

def _parse_step_data(self, action, obs, reward, terminated, truncated, info):
"""
Expand Down Expand Up @@ -188,13 +203,22 @@ def process_traj_to_hdf5(self, traj_data, traj_grp_name, nested_keys=("obs",)):

return traj_grp

@property
def should_save_current_episode(self):
"""
Returns:
bool: Whether the current episode should be saved or discarded
"""
# Only save successful demos and if actually recording
success = self.env.task.success or not self.only_successes
return success and self.hdf5_file is not None

def flush_current_traj(self):
"""
Flush current trajectory data
"""
# Only save successful demos and if actually recording
success = self.env.task.success or not self.only_successes
if success and self.hdf5_file is not None:
if self.should_save_current_episode:
traj_grp_name = f"demo_{self.traj_count}"
self.process_traj_to_hdf5(self.current_traj_history, traj_grp_name, nested_keys=["obs"])
self.traj_count += 1
Expand Down Expand Up @@ -251,14 +275,17 @@ class DataCollectionWrapper(DataWrapper):
dataset!
"""

def __init__(self, env, output_path, viewport_camera_path="/World/viewer_camera", only_successes=True):
def __init__(
self, env, output_path, viewport_camera_path="/World/viewer_camera", only_successes=True, use_vr=False
):
"""
Args:
env (Environment): The environment to wrap
output_path (str): path to store hdf5 data file
viewport_camera_path (str): prim path to the camera to use when rendering the main viewport during
data collection
only_successes (bool): Whether to only save successful episodes
use_vr (bool): Whether to use VR headset for data collection
"""
# Store additional variables needed for optimized data collection

Expand All @@ -269,6 +296,9 @@ def __init__(self, env, output_path, viewport_camera_path="/World/viewer_camera"
# the given simulator step. See add_transition_info() for more info
self.current_transitions = dict()

self._is_recording = True
self.use_vr = use_vr

# Add callbacks on import / remove objects and systems
og.sim.add_callback_on_system_init(
name="data_collection", callback=lambda system: self.add_transition_info(obj=system, add=True)
Expand All @@ -289,6 +319,18 @@ def __init__(self, env, output_path, viewport_camera_path="/World/viewer_camera"
# Configure the simulator to optimize for data collection
self._optimize_sim_for_data_collection(viewport_camera_path=viewport_camera_path)

@property
def is_recording(self):
return self._is_recording

@is_recording.setter
def is_recording(self, value: bool):
self._is_recording = value

def _record_step_trajectory(self, action, obs, reward, terminated, truncated, info):
if self.is_recording:
super()._record_step_trajectory(action, obs, reward, terminated, truncated, info)

def _optimize_sim_for_data_collection(self, viewport_camera_path):
"""
Configures the simulator to optimize for data collection
Expand All @@ -309,12 +351,14 @@ def _optimize_sim_for_data_collection(self, viewport_camera_path):
# toggling these settings to be True -> False -> True
# Only setting it to True once will actually freeze the GUI for some reason!
if not gm.HEADLESS:
lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True)
lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True)
lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", False)
lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", False)
lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True)
lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True)
# Async rendering does not work in VR mode
if not self.use_vr:
lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True)
lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True)
lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", False)
lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", False)
lazy.carb.settings.get_settings().set_bool("/app/asyncRendering", True)
lazy.carb.settings.get_settings().set_bool("/app/asyncRenderingLowLatency", True)

# Disable mouse grabbing since we're only using the UI passively
lazy.carb.settings.get_settings().set_bool("/physics/mouseInteractionEnabled", False)
Expand Down Expand Up @@ -383,6 +427,11 @@ def flush_current_traj(self):
self.max_state_size = 0
self.current_transitions = dict()

@property
def should_save_current_episode(self):
# In addition to default conditions, we only save the current episode if we are actually recording
return super().should_save_current_episode and self.is_recording

def add_transition_info(self, obj, add=True):
"""
Adds transition info to the current sim step for specific object @obj.
Expand Down Expand Up @@ -469,6 +518,10 @@ def create_from_hdf5(
if config["task"]["type"] == "BehaviorTask":
config["task"]["online_object_sampling"] = False

# Because we're loading directly from the cached scene file, we need to disable any additional objects that are being added since
# they will already be cached in the original scene file
config["objects"] = []

# Set observation modalities and update sensor config
for robot_cfg in config["robots"]:
robot_cfg["obs_modalities"] = robot_obs_modalities
Expand Down Expand Up @@ -522,15 +575,21 @@ def _parse_step_data(self, action, obs, reward, terminated, truncated, info):
step_data["truncated"] = truncated
return step_data

def playback_episode(self, episode_id, record=True):
def playback_episode(self, episode_id, record=True, video_path=None, video_writer=None):
"""
Playback episode @episode_id, and optionally record observation data if @record is True
Args:
episode_id (int): Episode to playback. This should be a valid demo ID number from the inputted collected
data hdf5 file
record (bool): Whether to record data during playback or not
video_path (None or str): If specified, path to write the playback video to
video_writer (None or str): If specified, an imageio video writer to use for writing the video (can be specified in place of @video_path)
"""
using_external_writer = video_writer is not None
if video_writer is None and video_path is not None:
video_writer = imageio.get_writer(video_path, fps=30)

data_grp = self.input_hdf5["data"]
assert f"demo_{episode_id}" in data_grp, f"No valid episode with ID {episode_id} found!"
traj_grp = data_grp[f"demo_{episode_id}"]
Expand Down Expand Up @@ -596,17 +655,33 @@ def playback_episode(self, episode_id, record=True):
)
self.current_traj_history.append(step_data)

# If writing video, save the current frame
if video_writer is not None:
video_writer.append_data(og.sim.viewer_camera.get_obs()[0]["rgb"][:, :, :3].numpy())

self.step_count += 1

if record:
self.flush_current_traj()

def playback_dataset(self, record=True):
# If we weren't using an external writer but we're still writing a video, close the writer
if video_writer is not None and not using_external_writer:
video_writer.close()

def playback_dataset(self, record=True, video_path=None, video_writer=None):
"""
Playback all episodes from the input HDF5 file, and optionally record observation data if @record is True
Args:
record (bool): Whether to record data during playback or not
video_path (None or str): If specified, path to write the playback video to
video_writer (None or str): If specified, an imageio video writer to use for writing the video (can be specified in place of @video_path)
"""
if video_writer is None and video_path is not None:
video_writer = imageio.get_writer(video_path, fps=30)
for episode_id in range(self.input_hdf5["data"].attrs["n_episodes"]):
self.playback_episode(episode_id=episode_id, record=record)
self.playback_episode(episode_id=episode_id, record=record, video_path=None, video_writer=video_writer)

# Close the video writer at the end if created
if video_writer is not None:
video_writer.close()

0 comments on commit 6203d71

Please sign in to comment.