From d40ba36c7437e8880db7147da18b07d1612bcb90 Mon Sep 17 00:00:00 2001 From: Adam Date: Fri, 25 Apr 2025 21:18:46 +0100 Subject: [PATCH] All of API has been tested and is working as expeted. Fixed an error where network was always initiliasing without it being needed. --- src/RSIPI/kuka_visualiser.py | 7 +++- src/RSIPI/main.py | 50 ++------------------------- src/RSIPI/rsi_api.py | 66 ++++++++++++++++++++++++------------ 3 files changed, 53 insertions(+), 70 deletions(-) diff --git a/src/RSIPI/kuka_visualiser.py b/src/RSIPI/kuka_visualiser.py index 345e7bf..71f64c3 100644 --- a/src/RSIPI/kuka_visualiser.py +++ b/src/RSIPI/kuka_visualiser.py @@ -41,7 +41,12 @@ class KukaRSIVisualiser: fig = plt.figure() ax = fig.add_subplot(111, projection='3d') - ax.plot(self.df["RIst.X"], self.df["RIst.Y"], self.df["RIst.Z"], + def safe_col(name): + return name if name in self.df.columns else f"Receive.{name}" + + ax.plot(self.df[safe_col("RIst.X")], + self.df[safe_col("RIst.Y")], + self.df[safe_col("RIst.Z")], label="Actual Trajectory", linestyle='-') if "RSol.X" in self.df.columns: diff --git a/src/RSIPI/main.py b/src/RSIPI/main.py index 99621ea..4463ecc 100644 --- a/src/RSIPI/main.py +++ b/src/RSIPI/main.py @@ -5,58 +5,12 @@ import time import threading -def generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100): - trajectory = [] - for i in range(steps): - t = 2 * math.pi * loops * (i / steps) - x = radius * math.sin(t) - y = radius * math.sin(t) * math.cos(t) - z = radius * math.sin(2 * t) / 2 - trajectory.append({ - "X": start_xyz["X"] + x, - "Y": start_xyz["Y"] + y, - "Z": start_xyz["Z"] + z - }) - return trajectory + def main(): api = RSIAPI("RSI_EthernetConfig.xml") - print("🛜 Starting RSI client...") - api.start_rsi() - time.sleep(2) - - start_xyz = {"X": 500, "Y": 0, "Z": 500} - trajectory = api.generate_trajectory( - start=start_xyz, - end=generate_figure_8_z(start_xyz, radius=100, loops=1, steps=1)[0], # endpoint unused - steps=100, - space="cartesian", - mode="absolute" - ) - # Overwrite with full 8-curve, skipping internal interpolation - trajectory = generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100) - - print(f"✅ Generated figure-8 trajectory with {len(trajectory)} steps.") - - # 🧠 Start live plotting in a thread (in mode 3d) - def start_plot(): - plotter = LivePlotter(api.client, mode="3d", interval=50) - plotter.start() - - threading.Thread(target=start_plot, daemon=True).start() - - print("🟢 Executing trajectory...") - api.execute_trajectory(trajectory, space="cartesian", rate=0.02) - - time.sleep(len(trajectory) * 0.02 + 2.0) - print("✅ Trajectory execution complete.") - - print("🛑 Stopping RSI...") - api.stop_rsi() - print("✅ RSI client stopped.") - - + print(api.compare_test_runs("25-04-2025_16-33-47.csv", "25-04-2025_20-57-59.csv")) if __name__ == "__main__": main() diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 7e3d063..01e2618 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -21,7 +21,8 @@ class RSIAPI: def __init__(self, config_file="RSI_EthernetConfig.xml"): """Initialize RSIAPI with an RSI client instance.""" self.thread = None - self.client = RSIClient(config_file) + self.config_file = config_file + self.client = None # Delay instantiation self.graph_process = None # Store graphing process self.graphing_instance = None self.graph_thread = None# @@ -29,8 +30,14 @@ class RSIAPI: self.live_plotter = None self.live_plot_thread = None + def _ensure_client(self): + """Ensure RSIClient is initialised before use.""" + if self.client is None: + from .rsi_client import RSIClient + self.client = RSIClient(self.config_file) + def start_rsi(self): - """Start the RSI client in a background thread.""" + self._ensure_client() self.thread = threading.Thread(target=self.client.start, daemon=True) self.thread.start() return "✅ RSI started in background." @@ -275,34 +282,51 @@ class RSIAPI: return f"✅ {alert_type.capitalize()} alert threshold set to {value}" return "❌ Invalid alert type. Use 'deviation' or 'force'." - ## TODO Test from here. - def generate_report(self, filename, format_type): - """Generate a statistical report from movement data.""" - data = self.client.get_movement_data() - df = pd.DataFrame(data) + """ + Generate a statistical report from a CSV log file. - report = { - "Max Position Deviation": df.iloc[:, 1:].max().to_dict(), - "Mean Position Deviation": df.iloc[:, 1:].mean().to_dict(), + Args: + filename (str): Path to the CSV file (or base name without .csv). + format_type (str): 'csv', 'json', or 'pdf' + """ + # Ensure filename ends with .csv + if not filename.endswith(".csv"): + filename += ".csv" + + if not os.path.exists(filename): + raise FileNotFoundError(f"❌ File not found: {filename}") + + df = pd.read_csv(filename) + + # Only keep relevant columns (e.g. actual positions) + position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")] + if not position_cols: + raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.") + + report_data = { + "Max Position": df[position_cols].max().to_dict(), + "Mean Position": df[position_cols].mean().to_dict(), } - path = f"{filename}.{format_type.lower()}" + report_base = filename.replace(".csv", "") + output_path = f"{report_base}_report.{format_type.lower()}" if format_type == "csv": - df.to_csv(path, index=False) + pd.DataFrame(report_data).T.to_csv(output_path) elif format_type == "json": - with open(path, "w") as f: - f.write(json.dumps(report)) + with open(output_path, "w") as f: + json.dump(report_data, f, indent=4) elif format_type == "pdf": fig, ax = plt.subplots() - df.plot(ax=ax) - plt.savefig(path) + pd.DataFrame(report_data).T.plot(kind='bar', ax=ax) + ax.set_title("RSI Position Report") + plt.tight_layout() + plt.savefig(output_path) else: - raise ValueError(f"❌ Unsupported report format: {format_type}") - - return f"✅ Report saved as {path}" + raise ValueError(f"❌ Unsupported format: {format_type}") + return f"✅ Report saved as {output_path}" @staticmethod def visualise_csv_log(csv_file, export=False): @@ -321,6 +345,7 @@ class RSIAPI: if export: visualizer.export_graphs() + ## TODO Need to test parsing krl to csv @staticmethod def parse_krl_to_csv(src_file, dat_file, output_file): """ @@ -340,6 +365,7 @@ class RSIAPI: except Exception as e: return f"❌ Error parsing KRL files: {e}" + ## TODO Need to test injecting RSI code. @staticmethod def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): """ @@ -357,8 +383,6 @@ class RSIAPI: except Exception as e: return f"❌ RSI injection failed: {e}" - ## TODO here - @staticmethod def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False): """Generates a linear trajectory (Cartesian or Joint)."""