RSI-PI/src/RSIPI/rsi_graphing.py
2025-04-02 20:30:09 +01:00

145 lines
6.6 KiB
Python

import time
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
from .rsi_client import RSIClient
class RSIGraphing:
"""Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring."""
def __init__(self, client, mode="position", overlay=False, plan_file=None):
"""Initialize graphing for real-time plotting."""
self.client = client
self.mode = mode
self.overlay = overlay # Enable/Disable planned vs. actual overlay
self.alerts_enabled = True # Default to alerts on
self.deviation_threshold = 5.0 # Default deviation threshold (mm)
self.force_threshold = 10.0 # Default force threshold (Nm)
self.fig, self.ax = plt.subplots(figsize=(10, 6))
# ✅ Data storage
self.time_data = deque(maxlen=100) # Store last 100 data points
self.position_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
self.velocity_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
self.acceleration_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
self.force_data = {axis: deque(maxlen=100) for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]}
self.previous_positions = {"X": 0, "Y": 0, "Z": 0}
self.previous_velocities = {"X": 0, "Y": 0, "Z": 0}
self.previous_time = time.time()
# ✅ Planned movement overlay storage
self.planned_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
self.deviation_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]}
if plan_file:
self.load_plan(plan_file)
self.ani = animation.FuncAnimation(self.fig, self.update_graph, interval=100, cache_frame_data=False)
plt.show()
def update_graph(self, frame):
"""Update the live graph based on selected mode and check for alerts."""
current_time = time.time()
dt = current_time - self.previous_time
self.previous_time = current_time
# ✅ Fetch live data from RSI client
position = self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})
force = self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0})
# ✅ Compute velocity and acceleration
for axis in ["X", "Y", "Z"]:
velocity = (position[axis] - self.previous_positions[axis]) / dt if dt > 0 else 0
acceleration = (velocity - self.previous_velocities[axis]) / dt if dt > 0 else 0
self.previous_positions[axis] = position[axis]
self.previous_velocities[axis] = velocity
self.position_data[axis].append(position[axis])
self.velocity_data[axis].append(velocity)
self.acceleration_data[axis].append(acceleration)
for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]:
self.force_data[axis].append(force[axis])
self.time_data.append(time.strftime("%H:%M:%S"))
# ✅ Planned vs. Actual movement overlay
if self.overlay and self.planned_data:
for axis in ["X", "Y", "Z"]:
planned_value = self.planned_data[axis][-1] if len(self.planned_data[axis]) > 0 else position[axis]
self.planned_data[axis].append(planned_value)
deviation = abs(position[axis] - planned_value)
self.deviation_data[axis].append(deviation)
# ✅ Trigger deviation alert
if self.alerts_enabled and deviation > self.deviation_threshold:
print(f"⚠️ Deviation Alert! {axis} exceeds {self.deviation_threshold} mm (Deviation: {deviation:.2f} mm)")
# ✅ Trigger force spike alert
if self.alerts_enabled:
for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]:
if self.force_data[axis][-1] > self.force_threshold:
print(f"⚠️ Force Spike Alert! {axis} exceeds {self.force_threshold} Nm (Force: {self.force_data[axis][-1]:.2f} Nm)")
# ✅ Clear the plot
self.ax.clear()
if self.mode == "position":
self.ax.plot(self.time_data, self.position_data["X"], label="X Position")
self.ax.plot(self.time_data, self.position_data["Y"], label="Y Position")
self.ax.plot(self.time_data, self.position_data["Z"], label="Z Position")
self.ax.set_title("Live Position Tracking with Alerts")
self.ax.set_ylabel("Position (mm)")
if self.overlay:
self.ax.plot(self.time_data, self.planned_data["X"], label="Planned X", linestyle="dashed")
self.ax.plot(self.time_data, self.planned_data["Y"], label="Planned Y", linestyle="dashed")
self.ax.plot(self.time_data, self.planned_data["Z"], label="Planned Z", linestyle="dashed")
self.ax.legend()
self.ax.set_xlabel("Time")
self.ax.tick_params(axis='x', rotation=45)
def change_mode(self, mode):
"""Change the graphing mode dynamically."""
if mode in ["position", "velocity", "acceleration", "force"]:
self.mode = mode
print(f"✅ Graphing mode changed to: {mode}")
else:
print("❌ Invalid mode. Available: position, velocity, acceleration, force")
def set_alert_threshold(self, alert_type, threshold):
"""Set threshold for deviation or force alerts."""
if alert_type == "deviation":
self.deviation_threshold = threshold
elif alert_type == "force":
self.force_threshold = threshold
print(f"{alert_type.capitalize()} alert threshold set to {threshold}")
def enable_alerts(self, enable):
"""Enable or disable alerts."""
self.alerts_enabled = enable
print(f"✅ Alerts {'enabled' if enable else 'disabled'}.")
def stop(self):
"""Stop live plotting loop by closing the figure."""
plt.close(self.fig)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="RSI Graphing Utility")
parser.add_argument("--mode", choices=["position", "velocity", "acceleration", "force"], default="position", help="Graphing mode")
parser.add_argument("--overlay", action="store_true", help="Enable planned vs. actual overlay")
parser.add_argument("--plan", type=str, help="CSV file with planned trajectory")
parser.add_argument("--alerts", action="store_true", help="Enable real-time alerts")
args = parser.parse_args()
client = RSIClient("RSI_EthernetConfig.xml")
graphing = RSIGraphing(client, mode=args.mode, overlay=args.overlay, plan_file=args.plan)
if not args.alerts:
graphing.enable_alerts(False)