Skip to content

Commit

Permalink
v1.0.1; removed unused functions in utils.py
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-bachhuber committed May 31, 2024
1 parent ef1d0fd commit 090a1fd
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 157 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "imt-diodem"
version = "1.0.0"
version = "1.0.1"
authors = [
{ name="Simon Bachhuber", email="simon.bachhuber@fau.de" },
]
Expand Down
158 changes: 2 additions & 156 deletions src/diodem/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import math
import os
from pathlib import Path
from typing import Optional
from typing import Optional, TypeVar
import warnings

import numpy as np
Expand All @@ -10,18 +8,8 @@
from qmt import vecInterp
from scipy.interpolate import CubicSpline
import tree
import tree_utils
from tree_utils import PyTree


def crop_sequence(data: dict, dt: float, t1: float = 0.0, t2: Optional[float] = None):
# crop time left and right
if t2 is None:
t2i = tree_utils.tree_shape(data)
else:
t2i = int(t2 / dt)
t1i = int(t1 / dt)
return tree.map_structure(lambda arr: np.array(arr)[t1i:t2i], data)
PyTree = TypeVar("PyTree")


def crop_tail(
Expand Down Expand Up @@ -166,145 +154,3 @@ def _cubic_interpolation(signal: np.ndarray, ts_out: np.ndarray):
ts_in = np.arange(len(signal))
interp_1D = lambda arr: (CubicSpline(ts_in, arr)(ts_out))
return np.array([interp_1D(signal[:, i]) for i in range(signal.shape[1])]).T


def autodetermine_imu_freq(path_imu_folder: str) -> int:
hz = []
for file in os.listdir(path_imu_folder):
file = Path(path_imu_folder).joinpath(file)
if file.suffix != ".txt":
continue

with open(file) as f:
f.readline()
# second line in txt file is: // Update Rate: 40.0Hz
second_line = f.readline()
before = len("// Update Rate:")
hz.append(int(float(second_line[before:-3])))

assert len(set(hz)) == 1, f"IMUs have multiple sampling rates {hz}"
return hz[0]


def autodetermine_optitrack_freq(path_optitrack: str):
def find_framerate_in_line(line: str, key: str):
before = line.find(key) + len(key) + 1
return int(float(line[before:].split(",")[0]))

# first line is:
# ...,Capture Frame Rate,120.000000,Export Frame Rate,120.000000,...
with open(path_optitrack) as f:
line = f.readline()
hz_cap = find_framerate_in_line(line, "Capture Frame Rate")
hz_exp = find_framerate_in_line(line, "Export Frame Rate")
if hz_cap != hz_exp:
warnings.warn(
f"Capture ({hz_cap}) and exported ({hz_exp}) frame rate are not equal"
)

return hz_exp


def autodetermine_imu_file_prefix(path_imu_folder: str) -> str:
prefixes = []
for file in os.listdir(path_imu_folder):
if file[-4:] != ".txt":
continue
prefixes.append(file[:-6])

assert len(set(prefixes)) == 1, f"IMUs have multiple different prefixes {prefixes}"
return prefixes[0]


_POSSIBLE_DELIMITERS = [";", "\t"]


def autodetermine_imu_file_delimiter(path_imu_folder: str) -> str:
delimiters = []
for file in os.listdir(path_imu_folder):
file = Path(path_imu_folder).joinpath(file)
if file.suffix != ".txt":
continue

with open(file) as f:
# throw away header
for _ in range(10):
f.readline()
# read in some data row
data_row = f.readline()
for delim in _POSSIBLE_DELIMITERS:
# 9D IMU + packet count = 10, or
# 9D IMU + packet count + finite time + quat estimate = 15
if len(data_row.split(delim)) in [10, 15]:
delimiters.append(delim)
break
else:
raise Exception(f"No possible delimiter found for row={data_row}")

assert (
len(set(delimiters)) == 1
), f"IMUs have multiple different delimiters {delimiters}"
return delimiters[0]


def autodetermine_space_units(path_optitrack: str) -> float:
# Example first row of header
# Format Version,1.23,Take Name,S_04,Take Notes,,Capture Frame Rate,30.000000,
# Export Frame Rate,120.000000,Capture Start Time,2023-06-02 12.09.45.344 PM,
# Capture Start Frame,246045,Total Frames in Take,19421,Total Exported Frames,
# 77681,Rotation Type,Quaternion,Length Units,Meters,Coordinate Space,Global

def find_length_units_in_line(line: str, key: str):
before = line.find(key) + len(key) + 1
return line[before:].split(",")[0]

# first line is:
# ...,Capture Frame Rate,120.000000,Export Frame Rate,120.000000,...
with open(path_optitrack) as f:
line = f.readline()
units = find_length_units_in_line(line, "Length Units")

return {"Meters": 1.0, "Millimeters": 1000.0}[units]


# could use instead of `qmt.nanInterp`
def _interp_nan_values(arr: np.ndarray, interp_fn):
"""Interpolate intermediate nan-values, and crop nan beginning and end.
Args:
arr (np.ndarray): NxF array.
interp_fn: (2xF, float) -> F,
"""
assert arr.ndim == 2

nan_values = []
current_idx = -1
current_run = 0
for i in range(len(arr)):
if np.any(np.isnan(arr[i])):
if current_run == 0:
current_idx = i
current_run += 1
else:
if current_run > 0:
nan_values.append((current_idx, current_run))
current_run = 0

for start, length in nan_values:
for i in range(length):
alpha = (i + 1) / (length + 1)
left = start - 1 if start != 0 else 0
arr[start + i] = interp_fn(arr[[left, start + length]], alpha)

# now `arr` has no more NaNs except if very first or very last value was NaN
for start in range(len(arr)):
if np.any(np.isnan(arr[start])):
continue
break

for stop in range(len(arr) - 1, -1, -1):
if np.any(np.isnan(arr[stop])):
continue
break

return arr[start:stop]

0 comments on commit 090a1fd

Please sign in to comment.