diff --git a/src/RSIPI/cli.py b/src/RSIPI/cli.py index eb1e66b..497f0ee 100644 --- a/src/RSIPI/cli.py +++ b/src/RSIPI/cli.py @@ -41,6 +41,10 @@ class RSICommandLineInterface: elif cmd == "set" and len(parts) >= 3: variable, value = parts[1], " ".join(parts[2:]) self.update_variable(variable, value) + elif cmd == "alerts" and len(parts) == 2: + self.toggle_alerts(parts[1]) + elif cmd == "set_alert_threshold" and len(parts) == 3: + self.set_alert_threshold(parts[1], parts[2]) elif cmd == "show": self.show_variables() elif cmd == "ipoc": @@ -81,6 +85,29 @@ class RSICommandLineInterface: else: print("❌ Unknown command. Type 'help' for a list of commands.") + def toggle_alerts(self, state): + """Enable or disable real-time alerts.""" + if state == "on": + self.client.enable_alerts(True) + print("✅ Real-time alerts enabled.") + elif state == "off": + self.client.enable_alerts(False) + print("✅ Real-time alerts disabled.") + else: + print("❌ Invalid option. Use 'alerts on' or 'alerts off'.") + + def set_alert_threshold(self, alert_type, value): + """Set thresholds for deviation or force alerts.""" + try: + value = float(value) + if alert_type in ["deviation", "force"]: + self.client.set_alert_threshold(alert_type, value) + print(f"✅ {alert_type.capitalize()} alert threshold set to {value}") + else: + print("❌ Invalid alert type. Use 'deviation' or 'force'.") + except ValueError: + print("❌ Invalid threshold value. Enter a numeric value.") + def export_data(self, filename): """Export movement data to a CSV file.""" self.client.export_movement_data(filename) @@ -99,32 +126,6 @@ class RSICommandLineInterface: self.client.generate_report(filename, format_type) print(f"✅ Report generated: {filename}.{format_type}") - def handle_graphing_command(self, parts): - """Handles graphing-related commands.""" - subcmd = parts[1] - - if subcmd == "start": - mode = parts[2] if len(parts) > 2 else "position" - self.start_graphing(mode) - elif subcmd == "stop": - self.stop_graphing() - elif subcmd == "mode" and len(parts) == 3: - self.change_graph_mode(parts[2]) - elif subcmd == "overlay" and len(parts) == 3: - if parts[2] == "on": - self.client.enable_graph_overlay(True) - print("✅ Overlay enabled.") - elif parts[2] == "off": - self.client.enable_graph_overlay(False) - print("✅ Overlay disabled.") - else: - print("❌ Invalid option. Use 'graph overlay on/off'.") - elif subcmd == "load_plan" and len(parts) == 3: - self.client.load_planned_trajectory(parts[2]) - print(f"✅ Loaded planned trajectory: {parts[2]}") - else: - print("❌ Invalid graph command. Use 'graph start', 'graph stop', 'graph mode ', 'graph overlay on/off', 'graph load_plan '.") - def show_help(self): """Displays the list of available commands.""" print(""" @@ -140,6 +141,8 @@ Available Commands: export compare report + alerts on/off + set_alert_threshold """) diff --git a/src/RSIPI/graphing.py b/src/RSIPI/graphing.py index 726e6ad..3b5756c 100644 --- a/src/RSIPI/graphing.py +++ b/src/RSIPI/graphing.py @@ -7,13 +7,16 @@ from collections import deque from src.RSIPI.rsi_client import RSIClient class RSIGraphing: - """Handles real-time and CSV-based graphing for RSI analysis with export options.""" + """Handles real-time and CSV-based graphing for RSI analysis with alerts and threshold monitoring.""" def __init__(self, client, mode="position", overlay=False, plan_file=None): """Initialize graphing for real-time plotting.""" self.client = client self.mode = mode self.overlay = overlay # Enable/Disable planned vs. actual overlay + self.alerts_enabled = True # Default to alerts on + self.deviation_threshold = 5.0 # Default deviation threshold (mm) + self.force_threshold = 10.0 # Default force threshold (Nm) self.fig, self.ax = plt.subplots(figsize=(10, 6)) # ✅ Data storage @@ -38,7 +41,7 @@ class RSIGraphing: plt.show() def update_graph(self, frame): - """Update the live graph based on selected mode.""" + """Update the live graph based on selected mode and check for alerts.""" current_time = time.time() dt = current_time - self.previous_time self.previous_time = current_time @@ -68,7 +71,18 @@ class RSIGraphing: for axis in ["X", "Y", "Z"]: planned_value = self.planned_data[axis][-1] if len(self.planned_data[axis]) > 0 else position[axis] self.planned_data[axis].append(planned_value) - self.deviation_data[axis].append(abs(position[axis] - planned_value)) + deviation = abs(position[axis] - planned_value) + self.deviation_data[axis].append(deviation) + + # ✅ Trigger deviation alert + if self.alerts_enabled and deviation > self.deviation_threshold: + print(f"⚠️ Deviation Alert! {axis} exceeds {self.deviation_threshold} mm (Deviation: {deviation:.2f} mm)") + + # ✅ Trigger force spike alert + if self.alerts_enabled: + for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]: + if self.force_data[axis][-1] > self.force_threshold: + print(f"⚠️ Force Spike Alert! {axis} exceeds {self.force_threshold} Nm (Force: {self.force_data[axis][-1]:.2f} Nm)") # ✅ Clear the plot self.ax.clear() @@ -77,7 +91,7 @@ class RSIGraphing: self.ax.plot(self.time_data, self.position_data["X"], label="X Position") self.ax.plot(self.time_data, self.position_data["Y"], label="Y Position") self.ax.plot(self.time_data, self.position_data["Z"], label="Z Position") - self.ax.set_title("Live Position Tracking") + self.ax.set_title("Live Position Tracking with Alerts") self.ax.set_ylabel("Position (mm)") if self.overlay: @@ -85,26 +99,6 @@ class RSIGraphing: self.ax.plot(self.time_data, self.planned_data["Y"], label="Planned Y", linestyle="dashed") self.ax.plot(self.time_data, self.planned_data["Z"], label="Planned Z", linestyle="dashed") - elif self.mode == "velocity": - self.ax.plot(self.time_data, self.velocity_data["X"], label="X Velocity") - self.ax.plot(self.time_data, self.velocity_data["Y"], label="Y Velocity") - self.ax.plot(self.time_data, self.velocity_data["Z"], label="Z Velocity") - self.ax.set_title("Live Velocity Profile") - self.ax.set_ylabel("Velocity (mm/s)") - - elif self.mode == "acceleration": - self.ax.plot(self.time_data, self.acceleration_data["X"], label="X Acceleration") - self.ax.plot(self.time_data, self.acceleration_data["Y"], label="Y Acceleration") - self.ax.plot(self.time_data, self.acceleration_data["Z"], label="Z Acceleration") - self.ax.set_title("Live Acceleration Profile") - self.ax.set_ylabel("Acceleration (mm/s²)") - - elif self.mode == "force": - for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]: - self.ax.plot(self.time_data, self.force_data[axis], label=f"Force {axis}") - self.ax.set_title("Live Force & Torque Monitoring") - self.ax.set_ylabel("Force (Nm)") - self.ax.legend() self.ax.set_xlabel("Time") self.ax.tick_params(axis='x', rotation=45) @@ -117,27 +111,18 @@ class RSIGraphing: else: print("❌ Invalid mode. Available: position, velocity, acceleration, force") - def export_graph_data(self, filename): - """Save graph data to CSV.""" - df = pd.DataFrame({ - "Time": list(self.time_data), - "Actual_X": list(self.position_data["X"]), - "Actual_Y": list(self.position_data["Y"]), - "Actual_Z": list(self.position_data["Z"]), - }) - df.to_csv(filename, index=False) - print(f"✅ Graph data saved to {filename}") + def set_alert_threshold(self, alert_type, threshold): + """Set threshold for deviation or force alerts.""" + if alert_type == "deviation": + self.deviation_threshold = threshold + elif alert_type == "force": + self.force_threshold = threshold + print(f"✅ {alert_type.capitalize()} alert threshold set to {threshold}") - def load_plan(self, filename): - """Load planned movement data from CSV.""" - try: - df = pd.read_csv(filename) - self.planned_data["X"] = deque(df["Planned_X"].tolist(), maxlen=100) - self.planned_data["Y"] = deque(df["Planned_Y"].tolist(), maxlen=100) - self.planned_data["Z"] = deque(df["Planned_Z"].tolist(), maxlen=100) - print(f"✅ Loaded planned trajectory from {filename}") - except Exception as e: - print(f"❌ Failed to load planned trajectory: {e}") + def enable_alerts(self, enable): + """Enable or disable alerts.""" + self.alerts_enabled = enable + print(f"✅ Alerts {'enabled' if enable else 'disabled'}.") if __name__ == "__main__": import argparse @@ -146,7 +131,11 @@ if __name__ == "__main__": parser.add_argument("--mode", choices=["position", "velocity", "acceleration", "force"], default="position", help="Graphing mode") parser.add_argument("--overlay", action="store_true", help="Enable planned vs. actual overlay") parser.add_argument("--plan", type=str, help="CSV file with planned trajectory") + parser.add_argument("--alerts", action="store_true", help="Enable real-time alerts") args = parser.parse_args() client = RSIClient("RSI_EthernetConfig.xml") - RSIGraphing(client, mode=args.mode, overlay=args.overlay, plan_file=args.plan) + graphing = RSIGraphing(client, mode=args.mode, overlay=args.overlay, plan_file=args.plan) + + if not args.alerts: + graphing.enable_alerts(False) diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 4052683..e245ab4 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -1,12 +1,13 @@ import multiprocessing import pandas as pd +import numpy as np import json import matplotlib.pyplot as plt from src.RSIPI.rsi_client import RSIClient from src.RSIPI.graphing import RSIGraphing class RSIAPI: - """RSI API for programmatic control.""" + """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" def __init__(self, config_file="RSI_EthernetConfig.xml"): """Initialize RSIAPI with an RSI client instance.""" @@ -40,6 +41,31 @@ class RSIAPI: "receive_variables": dict(self.client.receive_variables) } + def get_live_data(self): + """Retrieve real-time RSI data for external processing.""" + return { + "position": self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0}), + "velocity": self.client.receive_variables.get("Velocity", {"X": 0, "Y": 0, "Z": 0}), + "acceleration": self.client.receive_variables.get("Acceleration", {"X": 0, "Y": 0, "Z": 0}), + "force": self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}), + "ipoc": self.client.receive_variables.get("IPOC", "N/A") + } + + def get_live_data_as_numpy(self): + """Retrieve live RSI data as a NumPy array.""" + data = self.get_live_data() + return np.array([ + list(data["position"].values()), + list(data["velocity"].values()), + list(data["acceleration"].values()), + list(data["force"].values()) + ]) + + def get_live_data_as_dataframe(self): + """Retrieve live RSI data as a Pandas DataFrame.""" + data = self.get_live_data() + return pd.DataFrame([data]) + def get_ipoc(self): """Retrieve the latest IPOC value.""" return self.client.receive_variables.get("IPOC", "N/A") @@ -119,24 +145,18 @@ class RSIAPI: return "🛑 Graphing stopped." return "⚠️ No active graphing process." - def change_graph_mode(self, mode): - """Change the graphing mode dynamically.""" - if mode not in ["position", "velocity", "acceleration", "force"]: - return "❌ Invalid mode. Available: position, velocity, acceleration, force" + # ✅ ALERT METHODS + def enable_alerts(self, enable): + """Enable or disable real-time alerts.""" + self.client.enable_alerts(enable) + return f"✅ Alerts {'enabled' if enable else 'disabled'}." - self.stop_graphing() - self.start_graphing(mode) - return f"✅ Graphing mode changed to: {mode}" - - def enable_graph_overlay(self, enable): - """Enable or disable planned vs. actual overlay.""" - self.client.graph_overlay = enable - return f"✅ Overlay {'enabled' if enable else 'disabled'}." - - def load_planned_trajectory(self, filename): - """Load planned movement trajectory from CSV.""" - self.client.load_plan(filename) - return f"✅ Loaded planned trajectory: {filename}" + def set_alert_threshold(self, alert_type, value): + """Set threshold for deviation or force alerts.""" + if alert_type in ["deviation", "force"]: + self.client.set_alert_threshold(alert_type, value) + return f"✅ {alert_type.capitalize()} alert threshold set to {value}" + return "❌ Invalid alert type. Use 'deviation' or 'force'." # ✅ DATA EXPORT & ANALYSIS def export_movement_data(self, filename):