optimise api and added realtime error detection.

This commit is contained in:
Adam 2025-03-16 01:06:03 +00:00
parent 1f51cab5de
commit 7006d194de
3 changed files with 101 additions and 89 deletions

View File

@ -41,6 +41,10 @@ class RSICommandLineInterface:
elif cmd == "set" and len(parts) >= 3:
variable, value = parts[1], " ".join(parts[2:])
self.update_variable(variable, value)
elif cmd == "alerts" and len(parts) == 2:
self.toggle_alerts(parts[1])
elif cmd == "set_alert_threshold" and len(parts) == 3:
self.set_alert_threshold(parts[1], parts[2])
elif cmd == "show":
self.show_variables()
elif cmd == "ipoc":
@ -81,6 +85,29 @@ class RSICommandLineInterface:
else:
print("❌ Unknown command. Type 'help' for a list of commands.")
def toggle_alerts(self, state):
"""Enable or disable real-time alerts."""
if state == "on":
self.client.enable_alerts(True)
print("✅ Real-time alerts enabled.")
elif state == "off":
self.client.enable_alerts(False)
print("✅ Real-time alerts disabled.")
else:
print("❌ Invalid option. Use 'alerts on' or 'alerts off'.")
def set_alert_threshold(self, alert_type, value):
"""Set thresholds for deviation or force alerts."""
try:
value = float(value)
if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value)
print(f"{alert_type.capitalize()} alert threshold set to {value}")
else:
print("❌ Invalid alert type. Use 'deviation' or 'force'.")
except ValueError:
print("❌ Invalid threshold value. Enter a numeric value.")
def export_data(self, filename):
"""Export movement data to a CSV file."""
self.client.export_movement_data(filename)
@ -99,32 +126,6 @@ class RSICommandLineInterface:
self.client.generate_report(filename, format_type)
print(f"✅ Report generated: {filename}.{format_type}")
def handle_graphing_command(self, parts):
"""Handles graphing-related commands."""
subcmd = parts[1]
if subcmd == "start":
mode = parts[2] if len(parts) > 2 else "position"
self.start_graphing(mode)
elif subcmd == "stop":
self.stop_graphing()
elif subcmd == "mode" and len(parts) == 3:
self.change_graph_mode(parts[2])
elif subcmd == "overlay" and len(parts) == 3:
if parts[2] == "on":
self.client.enable_graph_overlay(True)
print("✅ Overlay enabled.")
elif parts[2] == "off":
self.client.enable_graph_overlay(False)
print("✅ Overlay disabled.")
else:
print("❌ Invalid option. Use 'graph overlay on/off'.")
elif subcmd == "load_plan" and len(parts) == 3:
self.client.load_planned_trajectory(parts[2])
print(f"✅ Loaded planned trajectory: {parts[2]}")
else:
print("❌ Invalid graph command. Use 'graph start', 'graph stop', 'graph mode <mode>', 'graph overlay on/off', 'graph load_plan <file>'.")
def show_help(self):
"""Displays the list of available commands."""
print("""
@ -140,6 +141,8 @@ Available Commands:
export <filename.csv>
compare <file1.csv> <file2.csv>
report <filename> <csv|json|pdf>
alerts on/off
set_alert_threshold <deviation|force> <value>
""")

View File

@ -7,13 +7,16 @@ from collections import deque
from src.RSIPI.rsi_client import RSIClient
class RSIGraphing:
"""Handles real-time and CSV-based graphing for RSI analysis with export options."""
"""Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring."""
def __init__(self, client, mode="position", overlay=False, plan_file=None):
"""Initialize graphing for real-time plotting."""
self.client = client
self.mode = mode
self.overlay = overlay # Enable/Disable planned vs. actual overlay
self.alerts_enabled = True # Default to alerts on
self.deviation_threshold = 5.0 # Default deviation threshold (mm)
self.force_threshold = 10.0 # Default force threshold (Nm)
self.fig, self.ax = plt.subplots(figsize=(10, 6))
# ✅ Data storage
@ -38,7 +41,7 @@ class RSIGraphing:
plt.show()
def update_graph(self, frame):
"""Update the live graph based on selected mode."""
"""Update the live graph based on selected mode and check for alerts."""
current_time = time.time()
dt = current_time - self.previous_time
self.previous_time = current_time
@ -68,7 +71,18 @@ class RSIGraphing:
for axis in ["X", "Y", "Z"]:
planned_value = self.planned_data[axis][-1] if len(self.planned_data[axis]) > 0 else position[axis]
self.planned_data[axis].append(planned_value)
self.deviation_data[axis].append(abs(position[axis] - planned_value))
deviation = abs(position[axis] - planned_value)
self.deviation_data[axis].append(deviation)
# ✅ Trigger deviation alert
if self.alerts_enabled and deviation > self.deviation_threshold:
print(f"⚠️ Deviation Alert! {axis} exceeds {self.deviation_threshold} mm (Deviation: {deviation:.2f} mm)")
# ✅ Trigger force spike alert
if self.alerts_enabled:
for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]:
if self.force_data[axis][-1] > self.force_threshold:
print(f"⚠️ Force Spike Alert! {axis} exceeds {self.force_threshold} Nm (Force: {self.force_data[axis][-1]:.2f} Nm)")
# ✅ Clear the plot
self.ax.clear()
@ -77,7 +91,7 @@ class RSIGraphing:
self.ax.plot(self.time_data, self.position_data["X"], label="X Position")
self.ax.plot(self.time_data, self.position_data["Y"], label="Y Position")
self.ax.plot(self.time_data, self.position_data["Z"], label="Z Position")
self.ax.set_title("Live Position Tracking")
self.ax.set_title("Live Position Tracking with Alerts")
self.ax.set_ylabel("Position (mm)")
if self.overlay:
@ -85,26 +99,6 @@ class RSIGraphing:
self.ax.plot(self.time_data, self.planned_data["Y"], label="Planned Y", linestyle="dashed")
self.ax.plot(self.time_data, self.planned_data["Z"], label="Planned Z", linestyle="dashed")
elif self.mode == "velocity":
self.ax.plot(self.time_data, self.velocity_data["X"], label="X Velocity")
self.ax.plot(self.time_data, self.velocity_data["Y"], label="Y Velocity")
self.ax.plot(self.time_data, self.velocity_data["Z"], label="Z Velocity")
self.ax.set_title("Live Velocity Profile")
self.ax.set_ylabel("Velocity (mm/s)")
elif self.mode == "acceleration":
self.ax.plot(self.time_data, self.acceleration_data["X"], label="X Acceleration")
self.ax.plot(self.time_data, self.acceleration_data["Y"], label="Y Acceleration")
self.ax.plot(self.time_data, self.acceleration_data["Z"], label="Z Acceleration")
self.ax.set_title("Live Acceleration Profile")
self.ax.set_ylabel("Acceleration (mm/s²)")
elif self.mode == "force":
for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]:
self.ax.plot(self.time_data, self.force_data[axis], label=f"Force {axis}")
self.ax.set_title("Live Force & Torque Monitoring")
self.ax.set_ylabel("Force (Nm)")
self.ax.legend()
self.ax.set_xlabel("Time")
self.ax.tick_params(axis='x', rotation=45)
@ -117,27 +111,18 @@ class RSIGraphing:
else:
print("❌ Invalid mode. Available: position, velocity, acceleration, force")
def export_graph_data(self, filename):
"""Save graph data to CSV."""
df = pd.DataFrame({
"Time": list(self.time_data),
"Actual_X": list(self.position_data["X"]),
"Actual_Y": list(self.position_data["Y"]),
"Actual_Z": list(self.position_data["Z"]),
})
df.to_csv(filename, index=False)
print(f"✅ Graph data saved to {filename}")
def set_alert_threshold(self, alert_type, threshold):
"""Set threshold for deviation or force alerts."""
if alert_type == "deviation":
self.deviation_threshold = threshold
elif alert_type == "force":
self.force_threshold = threshold
print(f"{alert_type.capitalize()} alert threshold set to {threshold}")
def load_plan(self, filename):
"""Load planned movement data from CSV."""
try:
df = pd.read_csv(filename)
self.planned_data["X"] = deque(df["Planned_X"].tolist(), maxlen=100)
self.planned_data["Y"] = deque(df["Planned_Y"].tolist(), maxlen=100)
self.planned_data["Z"] = deque(df["Planned_Z"].tolist(), maxlen=100)
print(f"✅ Loaded planned trajectory from {filename}")
except Exception as e:
print(f"❌ Failed to load planned trajectory: {e}")
def enable_alerts(self, enable):
"""Enable or disable alerts."""
self.alerts_enabled = enable
print(f"✅ Alerts {'enabled' if enable else 'disabled'}.")
if __name__ == "__main__":
import argparse
@ -146,7 +131,11 @@ if __name__ == "__main__":
parser.add_argument("--mode", choices=["position", "velocity", "acceleration", "force"], default="position", help="Graphing mode")
parser.add_argument("--overlay", action="store_true", help="Enable planned vs. actual overlay")
parser.add_argument("--plan", type=str, help="CSV file with planned trajectory")
parser.add_argument("--alerts", action="store_true", help="Enable real-time alerts")
args = parser.parse_args()
client = RSIClient("RSI_EthernetConfig.xml")
RSIGraphing(client, mode=args.mode, overlay=args.overlay, plan_file=args.plan)
graphing = RSIGraphing(client, mode=args.mode, overlay=args.overlay, plan_file=args.plan)
if not args.alerts:
graphing.enable_alerts(False)

View File

@ -1,12 +1,13 @@
import multiprocessing
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
from src.RSIPI.rsi_client import RSIClient
from src.RSIPI.graphing import RSIGraphing
class RSIAPI:
"""RSI API for programmatic control."""
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""
def __init__(self, config_file="RSI_EthernetConfig.xml"):
"""Initialize RSIAPI with an RSI client instance."""
@ -40,6 +41,31 @@ class RSIAPI:
"receive_variables": dict(self.client.receive_variables)
}
def get_live_data(self):
"""Retrieve real-time RSI data for external processing."""
return {
"position": self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0}),
"velocity": self.client.receive_variables.get("Velocity", {"X": 0, "Y": 0, "Z": 0}),
"acceleration": self.client.receive_variables.get("Acceleration", {"X": 0, "Y": 0, "Z": 0}),
"force": self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}),
"ipoc": self.client.receive_variables.get("IPOC", "N/A")
}
def get_live_data_as_numpy(self):
"""Retrieve live RSI data as a NumPy array."""
data = self.get_live_data()
return np.array([
list(data["position"].values()),
list(data["velocity"].values()),
list(data["acceleration"].values()),
list(data["force"].values())
])
def get_live_data_as_dataframe(self):
"""Retrieve live RSI data as a Pandas DataFrame."""
data = self.get_live_data()
return pd.DataFrame([data])
def get_ipoc(self):
"""Retrieve the latest IPOC value."""
return self.client.receive_variables.get("IPOC", "N/A")
@ -119,24 +145,18 @@ class RSIAPI:
return "🛑 Graphing stopped."
return "⚠️ No active graphing process."
def change_graph_mode(self, mode):
"""Change the graphing mode dynamically."""
if mode not in ["position", "velocity", "acceleration", "force"]:
return "❌ Invalid mode. Available: position, velocity, acceleration, force"
# ✅ ALERT METHODS
def enable_alerts(self, enable):
"""Enable or disable real-time alerts."""
self.client.enable_alerts(enable)
return f"✅ Alerts {'enabled' if enable else 'disabled'}."
self.stop_graphing()
self.start_graphing(mode)
return f"✅ Graphing mode changed to: {mode}"
def enable_graph_overlay(self, enable):
"""Enable or disable planned vs. actual overlay."""
self.client.graph_overlay = enable
return f"✅ Overlay {'enabled' if enable else 'disabled'}."
def load_planned_trajectory(self, filename):
"""Load planned movement trajectory from CSV."""
self.client.load_plan(filename)
return f"✅ Loaded planned trajectory: {filename}"
def set_alert_threshold(self, alert_type, value):
"""Set threshold for deviation or force alerts."""
if alert_type in ["deviation", "force"]:
self.client.set_alert_threshold(alert_type, value)
return f"{alert_type.capitalize()} alert threshold set to {value}"
return "❌ Invalid alert type. Use 'deviation' or 'force'."
# ✅ DATA EXPORT & ANALYSIS
def export_movement_data(self, filename):