-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Kalman Filter to get rid of noise from the CV sensor (#33)
* created kalman filter library and integrated it with main, added params file for kalman filter params * formatted files * got rid of erroneous files left over from another PR 😭 * removed more erroneous files * reverted accidental delete * added function for visualization the data * added visualizer and integrated into main * addressed more PR comments * implemented further PR changes * added array for acceleration visualization * created some hacky test code * fixed some imports * fixed index error * added code to run for 1000 iterations then visualize * fixed matplotlib issues * removed some print statements * removed an unnecessary function * created arguments to pass into system for easier tuning * formatted the file * made the arguments work * fixed some argument issues * fixed more issues * fixed import issue * implemented logging of noisy measurements * fixed writing to the JSON * commiting the recorded measured points * added offline tuning capability in kalman filter library * commiting a file with less points for debugging * fixed issue where cv data wasn't getting updated and logged properly * added append function to the hacky code * more fixes * updated measurements * quick test * updating the measured_points.json file * updated points to tune on * changes to kalman filter model * more new testing artifacts * new measured points * applied a morphological mask for better tuning * made some changes to CV system * issues with kalman filter initialization * added pause time in camera * more testing code 💀 * another fix * another change to get it working * updating measurement * fixed the sawtoothing issue with kalman filter * removed some comments * more fixes * cleaned up the filter a bit more * more cleanup * reverted some debug artifacts * reverted an unnecessary mask * nuked a bunch unnecessary shit * cleaned up main.py * got rid of visualizer from main * Integration changes --------- Co-authored-by: upadhyaydhruv <dmupadhyay12@gmail.com> Co-authored-by: 380 Pi <sahil.kale1@gmail.com>
- Loading branch information
1 parent
64e3b73
commit ee7f3d5
Showing
4 changed files
with
1,236 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
from point import Point | ||
import argparse | ||
import os | ||
import json | ||
|
||
|
||
class KalmanFilter: | ||
def __init__(self, dt, Q=0.01, R=5.0, outlier_threshold=0.2): | ||
self.K = 1.0 # Initial kalman gain | ||
self.dt = dt | ||
self.X = np.zeros( | ||
(6, 1) | ||
) # [x, y, vx, vy, ax, ay]^T - current estimate of the ball state | ||
self.P = np.eye(6) * 1000 # Large initial uncertainty | ||
|
||
self.A = np.array( | ||
[ | ||
[1, 0, self.dt, 0, 0.5 * self.dt**2, 0], | ||
[0, 1, 0, self.dt, 0, 0.5 * self.dt**2], | ||
[0, 0, 1, 0, self.dt, 0], | ||
[0, 0, 0, 1, 0, self.dt], | ||
[0, 0, 0, 0, 1, 0], | ||
[0, 0, 0, 0, 0, 1], | ||
] | ||
) # Transition matrix to get state estimate given previous estimate | ||
|
||
self.H = np.array( | ||
[[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0]] | ||
) # Measurement matrix | ||
|
||
self.Q = np.eye(6) * Q # Process noise covariance | ||
self.R = np.eye(2) * R # Measurement noise covariance | ||
|
||
# Arrays for visualization of data | ||
self.noisy_positions_x = [] | ||
self.noisy_positions_y = [] | ||
self.filtered_positions_x = [] | ||
self.filtered_positions_y = [] | ||
self.filtered_velocity_x = [] | ||
self.filtered_velocity_y = [] | ||
self.filtered_acceleration_x = [] | ||
self.filtered_acceleration_y = [] | ||
|
||
self.most_recent_measurement_x = 0 | ||
self.most_recent_measurement_y = 0 | ||
|
||
self.new_state = [0, 0, 0, 0, 0, 0] | ||
|
||
def predict(self): | ||
self.X = ( | ||
self.A @ self.X | ||
) # Get predicted next state based on the transition matrix | ||
self.P = ( | ||
self.A @ self.P @ self.A.T + self.Q | ||
) # Update the sensor covariance matrix | ||
|
||
self.new_state = [ | ||
self.X[0, 0], | ||
self.X[1, 0], | ||
self.X[2, 0], | ||
self.X[3, 0], | ||
self.X[4, 0], | ||
self.X[5, 0], | ||
] | ||
|
||
return Point(self.new_state[0], self.new_state[1]) | ||
|
||
def update(self, measured_point): | ||
# Use the new measurements to update the prediction | ||
self.most_recent_measurement_x = measured_point.x | ||
self.most_recent_measurement_y = measured_point.y | ||
Z = np.array( | ||
[self.most_recent_measurement_x, self.most_recent_measurement_y] | ||
).reshape(2, 1) | ||
|
||
# Update error vector Y | ||
Y = Z - self.H @ self.X | ||
S = self.H @ self.P @ self.H.T + self.R | ||
self.K = self.P @ self.H.T @ np.linalg.inv(S) | ||
|
||
self.P = (np.eye(len(self.P)) - self.K @ self.H) @ self.P | ||
|
||
self.X = self.X + self.K @ Y # Update value based on the new kalman gain | ||
|
||
self.new_state = [ | ||
self.X[0, 0], | ||
self.X[1, 0], | ||
self.X[2, 0], | ||
self.X[3, 0], | ||
self.X[4, 0], | ||
self.X[5, 0], | ||
] | ||
|
||
return Point(self.new_state[0], self.new_state[1]) | ||
|
||
def visualize_data(self): | ||
plt.figure(figsize=(10, 6)) | ||
|
||
# Plot noisy measurements | ||
plt.scatter( | ||
self.noisy_positions_x, | ||
self.noisy_positions_y, | ||
color="red", | ||
label="Noisy Data", | ||
s=10, | ||
alpha=0.5, | ||
) | ||
|
||
# Plot filtered estimates | ||
plt.plot( | ||
self.filtered_positions_x, | ||
self.filtered_positions_y, | ||
color="blue", | ||
label="Filtered Position Data (Kalman)", | ||
linewidth=2, | ||
) | ||
|
||
# Labeling | ||
plt.title("Kalman Filter: Noisy vs. Filtered Position") | ||
plt.xlabel("X Position") | ||
plt.ylabel("Y Position") | ||
plt.legend() | ||
plt.grid(True) | ||
|
||
# Show plot | ||
plt.show() | ||
|
||
def append_noisy_measurement(self): | ||
self.noisy_positions_x.append(self.most_recent_measurement_x) | ||
self.noisy_positions_y.append(self.most_recent_measurement_y) | ||
|
||
self.filtered_positions_x.append(self.new_state[0]) | ||
self.filtered_positions_y.append(self.new_state[1]) | ||
|
||
self.filtered_velocity_x.append(self.new_state[2]) | ||
self.filtered_velocity_y.append(self.new_state[3]) | ||
|
||
self.filtered_acceleration_x.append(self.new_state[4]) | ||
self.filtered_acceleration_y.append(self.new_state[5]) | ||
|
||
|
||
def main(Q_val, R_val, dt_val): | ||
current_dir = os.path.dirname(os.path.realpath(__file__)) | ||
measured_points_file_path = os.path.join(current_dir, "measured_points.json") | ||
with open(measured_points_file_path, "r") as file: | ||
data = json.load(file) # Load JSON data as a dictionary | ||
|
||
measured_points_x = data["x"] | ||
measured_points_y = data["y"] | ||
|
||
kalman_filter = KalmanFilter(Q_val, R_val, dt_val) | ||
|
||
for i in range(500): | ||
noisy_x = measured_points_x[i] | ||
noisy_y = measured_points_y[i] | ||
|
||
if noisy_x is not None and noisy_y is not None: | ||
if ( | ||
kalman_filter.most_recent_measurement_x is None | ||
or kalman_filter.most_recent_measurement_y is None | ||
): | ||
current_measurement = Point(noisy_x, noisy_y) | ||
|
||
elif ( | ||
np.abs(noisy_x - kalman_filter.most_recent_measurement_x) < 0.02 | ||
and np.abs(noisy_y - kalman_filter.most_recent_measurement_y) < 0.02 | ||
): | ||
current_measurement = Point(noisy_x, noisy_y) | ||
|
||
else: | ||
current_measurement = None | ||
|
||
else: | ||
current_measurement = None | ||
|
||
filtered_state = kalman_filter.predict() | ||
|
||
if current_measurement is not None: | ||
filtered_state = kalman_filter.update(current_measurement) | ||
else: | ||
print("Ball not detected!!! Using old value for now") | ||
kalman_filter.most_recent_measurement_x = None | ||
kalman_filter.most_recent_measurement_y = None | ||
|
||
kalman_filter.append_noisy_measurement() | ||
|
||
kalman_filter.visualize_data() | ||
|
||
|
||
if __name__ == "__main__": | ||
# Set up argument parser | ||
parser = argparse.ArgumentParser( | ||
description="Run camera and Kalman filter with specified parameters." | ||
) | ||
|
||
parser.add_argument( | ||
"--Q", type=float, default=0.01, help="Process noise covariance" | ||
) | ||
parser.add_argument( | ||
"--R", type=float, default=5.0, help="Measurement noise covariance" | ||
) | ||
parser.add_argument("--dt", type=float, default=1.0, help="Time step interval") | ||
|
||
args = parser.parse_args() | ||
|
||
# Call main function with parsed arguments | ||
main(args.Q, args.R, args.dt) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.