From 1f51cab5de2d14b3cd343787ec28f0dad3ebb544 Mon Sep 17 00:00:00 2001 From: Adam Date: Sun, 16 Mar 2025 00:54:28 +0000 Subject: [PATCH] graphing added --- src/RSIPI/cli.py | 160 +++++++++++--------------------- src/RSIPI/graphing.py | 211 ++++++++++++++++++++++++------------------ src/RSIPI/rsi_api.py | 91 +++++++++++++++++- 3 files changed, 265 insertions(+), 197 deletions(-) diff --git a/src/RSIPI/cli.py b/src/RSIPI/cli.py index 20f418b..eb1e66b 100644 --- a/src/RSIPI/cli.py +++ b/src/RSIPI/cli.py @@ -1,10 +1,13 @@ import sys import os import threading +import multiprocessing sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) from src.RSIPI.rsi_client import RSIClient +from src.RSIPI.graphing import RSIGraphing + class RSICommandLineInterface: """Command-Line Interface for controlling RSI Client.""" @@ -14,6 +17,7 @@ class RSICommandLineInterface: self.client = RSIClient(config_file) self.running = True self.rsi_thread = None # Store RSI thread + self.graph_process = None # Store graphing process def run(self): """Starts the CLI interaction loop.""" @@ -61,6 +65,14 @@ class RSICommandLineInterface: self.override_safety(parts[1]) elif cmd == "log" and len(parts) >= 2: self.handle_logging_command(parts) + elif cmd == "graph" and len(parts) >= 2: + self.handle_graphing_command(parts) + elif cmd == "export" and len(parts) == 2: + self.export_data(parts[1]) + elif cmd == "compare" and len(parts) == 3: + self.compare_test_runs(parts[1], parts[2]) + elif cmd == "report" and len(parts) >= 3: + self.generate_report(parts[1], parts[2]) elif cmd == "exit": self.stop_rsi() self.running = False @@ -69,115 +81,49 @@ class RSICommandLineInterface: else: print("❌ Unknown command. Type 'help' for a list of commands.") - def start_rsi(self): - """Start RSI in a separate thread to prevent CLI from freezing.""" - if self.rsi_thread and self.rsi_thread.is_alive(): - print("⚠️ RSI is already running.") + def export_data(self, filename): + """Export movement data to a CSV file.""" + self.client.export_movement_data(filename) + print(f"✅ Data exported to {filename}") + + def compare_test_runs(self, file1, file2): + """Compare two test runs from CSV files.""" + result = self.client.compare_test_runs(file1, file2) + print(result) + + def generate_report(self, filename, format_type): + """Generate a statistical report from movement data.""" + if format_type not in ["csv", "json", "pdf"]: + print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.") return + self.client.generate_report(filename, format_type) + print(f"✅ Report generated: {filename}.{format_type}") - self.rsi_thread = threading.Thread(target=self.client.start, daemon=True) - self.rsi_thread.start() - print("✅ RSI started in background.") - - def stop_rsi(self): - """Stop RSI and ensure the thread is properly stopped.""" - if not self.rsi_thread or not self.rsi_thread.is_alive(): - print("⚠️ RSI is not running.") - return - - self.client.stop() - self.rsi_thread.join(timeout=2) - print("✅ RSI stopped.") - - def update_variable(self, variable, value): - """Dynamically update a send variable.""" - try: - if value.replace('.', '', 1).isdigit(): - value = float(value) if '.' in value else int(value) - self.client.update_send_variable(variable, value) - print(f"✅ Updated {variable} to {value}") - except Exception as e: - print(f"❌ Failed to update {variable}: {e}") - - def show_variables(self): - """Display current send and receive variables.""" - print("\n📊 Current RSI Variables:") - print("SEND VARIABLES:") - for key, value in self.client.send_variables.items(): - print(f" {key}: {value}") - print("\nRECEIVE VARIABLES:") - for key, value in self.client.receive_variables.items(): - print(f" {key}: {value}") - print() - - def show_ipoc(self): - """Display the latest IPOC value.""" - print(f"🔄 Latest IPOC: {self.client.receive_variables.get('IPOC', 'N/A')}") - - def watch_network(self): - """Continuously display incoming RSI messages.""" - print("📡 Watching network traffic (Press CTRL+C to stop)...") - try: - while True: - print(self.client.receive_variables) - except KeyboardInterrupt: - print("🛑 Stopped watching network.") - - def reset_variables(self): - """Reset send variables to default values.""" - self.client.reset_send_variables() - print("✅ Send variables reset to default values.") - - def show_status(self): - """Display full RSI system status.""" - print("📄 RSI Status:") - self.show_variables() - print(f"🔌 Network: {self.client.config_parser.get_network_settings()}") - - def reconnect(self): - """Restart network connection without stopping RSI.""" - self.client.reconnect() - print("✅ Network connection restarted.") - - def toggle_digital_io(self, io, value): - """Toggle digital I/O states.""" - self.client.update_send_variable(io, int(value)) - print(f"✅ {io} set to {value}") - - def move_external_axis(self, axis, value): - """Move an external axis.""" - self.client.update_send_variable(f"ELPos.{axis}", float(value)) - print(f"✅ Moved {axis} to {value}") - - def correct_position(self, correction_type, axis, value): - """Apply correction to RKorr or AKorr.""" - self.client.update_send_variable(f"{correction_type}.{axis}", float(value)) - print(f"✅ Applied correction: {correction_type}.{axis} = {value}") - - def adjust_speed(self, tech_param, value): - """Adjust speed settings.""" - self.client.update_send_variable(tech_param, float(value)) - print(f"✅ Set {tech_param} to {value}") - - def override_safety(self, limit): - """Override safety limits.""" - print(f"⚠️ Overriding safety limit: {limit}") - - def handle_logging_command(self, parts): - """Handles logging-related commands.""" + def handle_graphing_command(self, parts): + """Handles graphing-related commands.""" subcmd = parts[1] - if subcmd == "start" and len(parts) == 3: - filename = parts[2] - self.client.start_logging(filename) - print(f"✅ Logging started: {filename}") + + if subcmd == "start": + mode = parts[2] if len(parts) > 2 else "position" + self.start_graphing(mode) elif subcmd == "stop": - self.client.stop_logging() - print("🛑 Logging stopped.") - elif subcmd == "status": - status = "ON" if self.client.is_logging_active() else "OFF" - print(f"📊 Logging Status: {status}") + self.stop_graphing() + elif subcmd == "mode" and len(parts) == 3: + self.change_graph_mode(parts[2]) + elif subcmd == "overlay" and len(parts) == 3: + if parts[2] == "on": + self.client.enable_graph_overlay(True) + print("✅ Overlay enabled.") + elif parts[2] == "off": + self.client.enable_graph_overlay(False) + print("✅ Overlay disabled.") + else: + print("❌ Invalid option. Use 'graph overlay on/off'.") + elif subcmd == "load_plan" and len(parts) == 3: + self.client.load_planned_trajectory(parts[2]) + print(f"✅ Loaded planned trajectory: {parts[2]}") else: - print("❌ Invalid log command. Use 'log start .csv', 'log stop', or 'log status'.") + print("❌ Invalid graph command. Use 'graph start', 'graph stop', 'graph mode ', 'graph overlay on/off', 'graph load_plan '.") def show_help(self): """Displays the list of available commands.""" @@ -189,8 +135,14 @@ Available Commands: correct , speed override log start .csv, log stop, log status + graph start , graph stop, graph mode + graph overlay on/off, graph load_plan + export + compare + report """) + if __name__ == "__main__": config_file = "RSI_EthernetConfig.xml" cli = RSICommandLineInterface(config_file) diff --git a/src/RSIPI/graphing.py b/src/RSIPI/graphing.py index 4b0cc43..726e6ad 100644 --- a/src/RSIPI/graphing.py +++ b/src/RSIPI/graphing.py @@ -1,121 +1,152 @@ import time -import logger import pandas as pd +import numpy as np import matplotlib.pyplot as plt import matplotlib.animation as animation - -# ✅ Configure Logging for Graphing Module -LOGGING_ENABLED = True - -if LOGGING_ENABLED: - logging.basicConfig( - filename="graphing.log", - level=logging.DEBUG, - format="%(asctime)s [%(levelname)s] %(message)s", - datefmt="%d-%m-%Y %H:%M:%S" - ) +from collections import deque +from src.RSIPI.rsi_client import RSIClient class RSIGraphing: - """Handles real-time and CSV-based graphing for RSI analysis.""" + """Handles real-time and CSV-based graphing for RSI analysis with export options.""" - def __init__(self, csv_file=None): - """Initialize graphing for real-time plotting or CSV replay.""" - self.csv_file = csv_file - self.fig, self.ax = plt.subplots(1, 2, figsize=(12, 5)) # ✅ Two subplots: Time-Series & X-Y Trajectory + def __init__(self, client, mode="position", overlay=False, plan_file=None): + """Initialize graphing for real-time plotting.""" + self.client = client + self.mode = mode + self.overlay = overlay # Enable/Disable planned vs. actual overlay + self.fig, self.ax = plt.subplots(figsize=(10, 6)) - # ✅ Titles and Labels - self.ax[0].set_title("Real-Time Position Tracking") - self.ax[0].set_xlabel("Time") - self.ax[0].set_ylabel("Position (mm)") - self.ax[1].set_title("2D Trajectory (X-Y)") - self.ax[1].set_xlabel("X Position") - self.ax[1].set_ylabel("Y Position") + # ✅ Data storage + self.time_data = deque(maxlen=100) # Store last 100 data points + self.position_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} + self.velocity_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} + self.acceleration_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} + self.force_data = {axis: deque(maxlen=100) for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]} - # ✅ Data storage for plotting - self.time_data = [] - self.x_data, self.y_data, self.z_data = [], [], [] - self.actual_x, self.actual_y, self.actual_z = [], [], [] - self.trajectory_x, self.trajectory_y = [], [] + self.previous_positions = {"X": 0, "Y": 0, "Z": 0} + self.previous_velocities = {"X": 0, "Y": 0, "Z": 0} + self.previous_time = time.time() - self.plot_mode = "TCP" # ✅ Toggle between "TCP" and "Joints" + # ✅ Planned movement overlay storage + self.planned_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} + self.deviation_data = {axis: deque(maxlen=100) for axis in ["X", "Y", "Z"]} - if csv_file: - self.load_csv_log(csv_file) - self.plot_csv_data() - else: - self.ani = animation.FuncAnimation(self.fig, self.update_graph, interval=500, save_count=100) + if plan_file: + self.load_plan(plan_file) - plt.show() + self.ani = animation.FuncAnimation(self.fig, self.update_graph, interval=100, cache_frame_data=False) + plt.show() def update_graph(self, frame): - """Update the live graph with new position data.""" - timestamp = time.strftime("%H:%M:%S") - self.time_data.append(timestamp) + """Update the live graph based on selected mode.""" + current_time = time.time() + dt = current_time - self.previous_time + self.previous_time = current_time - # ✅ Simulated Data for Testing (Replace with real-time data) - self.x_data.append(len(self.time_data) * 10) - self.y_data.append(len(self.time_data) * 5) - self.z_data.append(len(self.time_data) * 2) + # ✅ Fetch live data from RSI client + position = self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0}) + force = self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}) - self.actual_x.append(len(self.time_data) * 9.8) - self.actual_y.append(len(self.time_data) * 4.9) - self.actual_z.append(len(self.time_data) * 1.8) + # ✅ Compute velocity and acceleration + for axis in ["X", "Y", "Z"]: + velocity = (position[axis] - self.previous_positions[axis]) / dt if dt > 0 else 0 + acceleration = (velocity - self.previous_velocities[axis]) / dt if dt > 0 else 0 + self.previous_positions[axis] = position[axis] + self.previous_velocities[axis] = velocity - # ✅ 2D Trajectory (X-Y) - self.trajectory_x.append(len(self.time_data) * 9.8) - self.trajectory_y.append(len(self.time_data) * 4.9) + self.position_data[axis].append(position[axis]) + self.velocity_data[axis].append(velocity) + self.acceleration_data[axis].append(acceleration) - # ✅ Clear and replot data - self.ax[0].clear() - self.ax[0].plot(self.time_data, self.x_data, label="Planned X", linestyle="dashed") - self.ax[0].plot(self.time_data, self.y_data, label="Planned Y", linestyle="dashed") - self.ax[0].plot(self.time_data, self.z_data, label="Planned Z", linestyle="dashed") + for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]: + self.force_data[axis].append(force[axis]) - self.ax[0].plot(self.time_data, self.actual_x, label="Actual X", marker="o") - self.ax[0].plot(self.time_data, self.actual_y, label="Actual Y", marker="o") - self.ax[0].plot(self.time_data, self.actual_z, label="Actual Z", marker="o") + self.time_data.append(time.strftime("%H:%M:%S")) - self.ax[0].legend() - self.ax[0].set_title(f"Real-Time {self.plot_mode} Position") - self.ax[0].set_xlabel("Time") - self.ax[0].set_ylabel("Position (mm)") + # ✅ Planned vs. Actual movement overlay + if self.overlay and self.planned_data: + for axis in ["X", "Y", "Z"]: + planned_value = self.planned_data[axis][-1] if len(self.planned_data[axis]) > 0 else position[axis] + self.planned_data[axis].append(planned_value) + self.deviation_data[axis].append(abs(position[axis] - planned_value)) - # ✅ 2D Trajectory (X-Y) - self.ax[1].clear() - self.ax[1].plot(self.trajectory_x, self.trajectory_y, label="Actual Path", marker="o") - self.ax[1].set_title("2D Trajectory (X-Y)") - self.ax[1].set_xlabel("X Position") - self.ax[1].set_ylabel("Y Position") - self.ax[1].legend() + # ✅ Clear the plot + self.ax.clear() - def load_csv_log(self, filename): - """Load a CSV log and replay motion.""" - df = pd.read_csv(filename) - self.time_data = df["Timestamp"].tolist() - self.trajectory_x = df["Receive.RIst.X"].tolist() - self.trajectory_y = df["Receive.RIst.Y"].tolist() + if self.mode == "position": + self.ax.plot(self.time_data, self.position_data["X"], label="X Position") + self.ax.plot(self.time_data, self.position_data["Y"], label="Y Position") + self.ax.plot(self.time_data, self.position_data["Z"], label="Z Position") + self.ax.set_title("Live Position Tracking") + self.ax.set_ylabel("Position (mm)") - logging.info(f"Loaded CSV log: {filename}") + if self.overlay: + self.ax.plot(self.time_data, self.planned_data["X"], label="Planned X", linestyle="dashed") + self.ax.plot(self.time_data, self.planned_data["Y"], label="Planned Y", linestyle="dashed") + self.ax.plot(self.time_data, self.planned_data["Z"], label="Planned Z", linestyle="dashed") - def plot_csv_data(self): - """Plot data from a CSV file for post-analysis.""" - plt.figure(figsize=(10, 5)) - plt.plot(self.trajectory_x, self.trajectory_y, label="Planned Path", linestyle="dashed") - plt.scatter(self.trajectory_x, self.trajectory_y, color='red', label="Actual Path", marker="o") - plt.title("CSV Replay: 2D Trajectory (X-Y)") - plt.xlabel("X Position") - plt.ylabel("Y Position") - plt.legend() - plt.show() + elif self.mode == "velocity": + self.ax.plot(self.time_data, self.velocity_data["X"], label="X Velocity") + self.ax.plot(self.time_data, self.velocity_data["Y"], label="Y Velocity") + self.ax.plot(self.time_data, self.velocity_data["Z"], label="Z Velocity") + self.ax.set_title("Live Velocity Profile") + self.ax.set_ylabel("Velocity (mm/s)") + + elif self.mode == "acceleration": + self.ax.plot(self.time_data, self.acceleration_data["X"], label="X Acceleration") + self.ax.plot(self.time_data, self.acceleration_data["Y"], label="Y Acceleration") + self.ax.plot(self.time_data, self.acceleration_data["Z"], label="Z Acceleration") + self.ax.set_title("Live Acceleration Profile") + self.ax.set_ylabel("Acceleration (mm/s²)") + + elif self.mode == "force": + for axis in ["A1", "A2", "A3", "A4", "A5", "A6"]: + self.ax.plot(self.time_data, self.force_data[axis], label=f"Force {axis}") + self.ax.set_title("Live Force & Torque Monitoring") + self.ax.set_ylabel("Force (Nm)") + + self.ax.legend() + self.ax.set_xlabel("Time") + self.ax.tick_params(axis='x', rotation=45) + + def change_mode(self, mode): + """Change the graphing mode dynamically.""" + if mode in ["position", "velocity", "acceleration", "force"]: + self.mode = mode + print(f"✅ Graphing mode changed to: {mode}") + else: + print("❌ Invalid mode. Available: position, velocity, acceleration, force") + + def export_graph_data(self, filename): + """Save graph data to CSV.""" + df = pd.DataFrame({ + "Time": list(self.time_data), + "Actual_X": list(self.position_data["X"]), + "Actual_Y": list(self.position_data["Y"]), + "Actual_Z": list(self.position_data["Z"]), + }) + df.to_csv(filename, index=False) + print(f"✅ Graph data saved to {filename}") + + def load_plan(self, filename): + """Load planned movement data from CSV.""" + try: + df = pd.read_csv(filename) + self.planned_data["X"] = deque(df["Planned_X"].tolist(), maxlen=100) + self.planned_data["Y"] = deque(df["Planned_Y"].tolist(), maxlen=100) + self.planned_data["Z"] = deque(df["Planned_Z"].tolist(), maxlen=100) + print(f"✅ Loaded planned trajectory from {filename}") + except Exception as e: + print(f"❌ Failed to load planned trajectory: {e}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="RSI Graphing Utility") - parser.add_argument("--csv", help="Path to CSV file for replay mode", type=str) + parser.add_argument("--mode", choices=["position", "velocity", "acceleration", "force"], default="position", help="Graphing mode") + parser.add_argument("--overlay", action="store_true", help="Enable planned vs. actual overlay") + parser.add_argument("--plan", type=str, help="CSV file with planned trajectory") args = parser.parse_args() - if args.csv: - RSIGraphing(csv_file=args.csv) - else: - RSIGraphing() # Run in live mode if no CSV is provided + client = RSIClient("RSI_EthernetConfig.xml") + RSIGraphing(client, mode=args.mode, overlay=args.overlay, plan_file=args.plan) diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index ce22787..4052683 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -1,9 +1,17 @@ +import multiprocessing +import pandas as pd +import json +import matplotlib.pyplot as plt +from src.RSIPI.rsi_client import RSIClient +from src.RSIPI.graphing import RSIGraphing + class RSIAPI: """RSI API for programmatic control.""" - def __init__(self, client): + def __init__(self, config_file="RSI_EthernetConfig.xml"): """Initialize RSIAPI with an RSI client instance.""" - self.client = client + self.client = RSIClient(config_file) + self.graph_process = None # Store graphing process def start_rsi(self): """Start the RSI client.""" @@ -78,6 +86,7 @@ class RSIAPI: "receive_variables": dict(self.client.receive_variables) } + # ✅ CSV LOGGING METHODS def start_logging(self, filename): """Start logging RSI data to CSV.""" self.client.start_logging(filename) @@ -90,4 +99,80 @@ class RSIAPI: def is_logging_active(self): """Return logging status.""" - return self.client.is_logging_active() \ No newline at end of file + return self.client.is_logging_active() + + # ✅ GRAPHING METHODS + def start_graphing(self, mode="position"): + """Start real-time graphing.""" + if self.graph_process and self.graph_process.is_alive(): + return "⚠️ Graphing is already running." + + self.graph_process = multiprocessing.Process(target=RSIGraphing, args=(self.client, mode)) + self.graph_process.start() + return f"✅ Graphing started in {mode} mode." + + def stop_graphing(self): + """Stop live graphing.""" + if self.graph_process and self.graph_process.is_alive(): + self.graph_process.terminate() + self.graph_process.join() + return "🛑 Graphing stopped." + return "⚠️ No active graphing process." + + def change_graph_mode(self, mode): + """Change the graphing mode dynamically.""" + if mode not in ["position", "velocity", "acceleration", "force"]: + return "❌ Invalid mode. Available: position, velocity, acceleration, force" + + self.stop_graphing() + self.start_graphing(mode) + return f"✅ Graphing mode changed to: {mode}" + + def enable_graph_overlay(self, enable): + """Enable or disable planned vs. actual overlay.""" + self.client.graph_overlay = enable + return f"✅ Overlay {'enabled' if enable else 'disabled'}." + + def load_planned_trajectory(self, filename): + """Load planned movement trajectory from CSV.""" + self.client.load_plan(filename) + return f"✅ Loaded planned trajectory: {filename}" + + # ✅ DATA EXPORT & ANALYSIS + def export_movement_data(self, filename): + """Export movement data to a CSV file.""" + data = self.client.get_movement_data() + df = pd.DataFrame(data) + df.to_csv(filename, index=False) + return f"✅ Data exported to {filename}" + + def compare_test_runs(self, file1, file2): + """Compare two movement logs.""" + df1 = pd.read_csv(file1) + df2 = pd.read_csv(file2) + + diff = abs(df1 - df2) + max_deviation = diff.max() + return f"📊 Max Deviation: {max_deviation}" + + def generate_report(self, filename, format_type): + """Generate a statistical report from movement data.""" + data = self.client.get_movement_data() + df = pd.DataFrame(data) + + report = { + "Max Position Deviation": df.iloc[:, 1:].max().to_dict(), + "Mean Position Deviation": df.iloc[:, 1:].mean().to_dict(), + } + + if format_type == "csv": + df.to_csv(f"{filename}.csv", index=False) + elif format_type == "json": + with open(f"{filename}.json", "w") as f: + json.dump(report, f) + elif format_type == "pdf": + fig, ax = plt.subplots() + df.plot(ax=ax) + plt.savefig(f"{filename}.pdf") + + return f"✅ Report saved as {filename}.{format_type}"