diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 9d4f02a..76a582b 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -15,54 +15,6 @@ from src.RSIPI.live_plotter import LivePlotter from threading import Thread import asyncio - -def generate_report(filename, format_type): - """ - Generate a statistical report from a CSV log file. - - Args: - filename (str): Path to the CSV file (or base name without .csv). - format_type (str): 'csv', 'json', or 'pdf' - """ - # Ensure filename ends with .csv - if not filename.endswith(".csv"): - filename += ".csv" - - if not os.path.exists(filename): - raise FileNotFoundError(f"❌ File not found: {filename}") - - df = pd.read_csv(filename) - - # Only keep relevant columns (e.g. actual positions) - position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")] - if not position_cols: - raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.") - - report_data = { - "Max Position": df[position_cols].max().to_dict(), - "Mean Position": df[position_cols].mean().to_dict(), - } - - report_base = filename.replace(".csv", "") - output_path = f"{report_base}_report.{format_type.lower()}" - - if format_type == "csv": - pd.DataFrame(report_data).T.to_csv(output_path) - elif format_type == "json": - with open(output_path, "w") as f: - json.dump(report_data, f, indent=4) - elif format_type == "pdf": - fig, ax = plt.subplots() - pd.DataFrame(report_data).T.plot(kind='bar', ax=ax) - ax.set_title("RSI Position Report") - plt.tight_layout() - plt.savefig(output_path) - else: - raise ValueError(f"❌ Unsupported format: {format_type}") - - return f"✅ Report saved as {output_path}" - - class RSIAPI: """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" @@ -95,6 +47,52 @@ class RSIAPI: self.client.stop() return "✅ RSI stopped." + def generate_report(filename, format_type): + """ + Generate a statistical report from a CSV log file. + + Args: + filename (str): Path to the CSV file (or base name without .csv). + format_type (str): 'csv', 'json', or 'pdf' + """ + # Ensure filename ends with .csv + if not filename.endswith(".csv"): + filename += ".csv" + + if not os.path.exists(filename): + raise FileNotFoundError(f"❌ File not found: {filename}") + + df = pd.read_csv(filename) + + # Only keep relevant columns (e.g. actual positions) + position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")] + if not position_cols: + raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.") + + report_data = { + "Max Position": df[position_cols].max().to_dict(), + "Mean Position": df[position_cols].mean().to_dict(), + } + + report_base = filename.replace(".csv", "") + output_path = f"{report_base}_report.{format_type.lower()}" + + if format_type == "csv": + pd.DataFrame(report_data).T.to_csv(output_path) + elif format_type == "json": + with open(output_path, "w") as f: + json.dump(report_data, f, indent=4) + elif format_type == "pdf": + fig, ax = plt.subplots() + pd.DataFrame(report_data).T.plot(kind='bar', ax=ax) + ax.set_title("RSI Position Report") + plt.tight_layout() + plt.savefig(output_path) + else: + raise ValueError(f"❌ Unsupported format: {format_type}") + + return f"✅ Report saved as {output_path}" + def update_variable(self, name, value): if "." in name: parent, child = name.split(".", 1) @@ -483,12 +481,24 @@ class RSIAPI: def update_cartesian(self, **kwargs): """ Update Cartesian correction values (RKorr). - Example: update_cartesian(X=10.0, Y=0.0, Z=0.0) """ + self._ensure_client() + if "RKorr" not in self.client.send_variables: + print("⚠️ Warning: RKorr not configured in send_variables. Skipping Cartesian update.") + return + for axis, value in kwargs.items(): self.update_variable(f"RKorr.{axis}", float(value)) def update_joints(self, **kwargs): + """ + Update joint correction values (AKorr). + """ + self._ensure_client() + if "AKorr" not in self.client.send_variables: + print("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.") + return + for axis, value in kwargs.items(): self.update_variable(f"AKorr.{axis}", float(value)) @@ -516,3 +526,84 @@ class RSIAPI: except KeyboardInterrupt: print("\n🛑 Stopped network watch.") + + # --- 🤖 High-level Cartesian and Joint Trajectory Movement --- + + def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012): + """ + Generate and execute a Cartesian (TCP) movement between two poses. + Args: + start_pose (dict): e.g. {"X":0, "Y":0, "Z":500} + end_pose (dict): e.g. {"X":100, "Y":0, "Z":500} + steps (int): Number of interpolation points. + rate (float): Time between points in seconds. + """ + trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") + self.execute_trajectory(trajectory, space="cartesian", rate=rate) + + def move_joint_trajectory(self, start_joints, end_joints, steps=50, rate=0.4): + """ + Generate and execute a Joint-space movement between two poses. + Args: + start_joints (dict): e.g. {"A1":0, "A2":0, "A3":0, ...} + end_joints (dict): e.g. {"A1":90, "A2":0, "A3":0, ...} + steps (int): Number of interpolation points. + rate (float): Time between points in seconds. + """ + trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") + self.execute_trajectory(trajectory, space="joint", rate=rate) + + def queue_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012): + """ + Generate and queue a Cartesian movement (no execution). + """ + if not isinstance(start_pose, dict) or not isinstance(end_pose, dict): + raise ValueError("❌ start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})") + if steps <= 0: + raise ValueError("❌ Steps must be greater than zero.") + if rate <= 0: + raise ValueError("❌ Rate must be greater than zero.") + + trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian") + self.queue_trajectory(trajectory, "cartesian", rate) + + def queue_joint_trajectory(self, start_joints, end_joints, steps=50, rate=0.4): + """ + Generate and queue a Joint-space movement (no execution). + """ + if not isinstance(start_joints, dict) or not isinstance(end_joints, dict): + raise ValueError("❌ start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})") + if steps <= 0: + raise ValueError("❌ Steps must be greater than zero.") + if rate <= 0: + raise ValueError("❌ Rate must be greater than zero.") + + trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint") + self.queue_trajectory(trajectory, "joint", rate) + + # --- 🛡️ Safety Management --- + + def safety_stop(self): + """Trigger emergency stop.""" + self._ensure_client() + self.client.safety_manager.emergency_stop() + + def safety_reset(self): + """Reset emergency stop.""" + self._ensure_client() + self.client.safety_manager.reset_stop() + + def safety_status(self): + """Return detailed safety status.""" + self._ensure_client() + sm = self.client.safety_manager + return { + "emergency_stop": sm.is_stopped(), + "safety_override": self.is_safety_overridden(), + "limits": sm.get_limits(), + } + + def safety_set_limit(self, variable, lower, upper): + """Set new safety limit bounds for a specific variable.""" + self._ensure_client() + self.client.safety_manager.set_limit(variable, float(lower), float(upper)) diff --git a/src/RSIPI/rsi_cli.py b/src/RSIPI/rsi_cli.py index f35e6a5..145b4a8 100644 --- a/src/RSIPI/rsi_cli.py +++ b/src/RSIPI/rsi_cli.py @@ -44,7 +44,8 @@ class RSICommandLineInterface: group = parts[0] self.client.show_variables(group) elif cmd == "ipoc": - self.client.get_ipoc() + ipoc = self.client.get_ipoc() + print(f"🛰 Current IPOC: {ipoc}") elif cmd == "watch": duration = float(parts[0]) if parts else None self.client.watch_network(duration=duration) @@ -279,7 +280,7 @@ class RSICommandLineInterface: if format_type not in ["csv", "json", "pdf"]: print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.") return - generate_report(filename, format_type) + self.client.generate_report(filename, format_type) print(f"✅ Report generated: {filename}.{format_type}") @@ -323,15 +324,15 @@ Available Commands: set_alert_threshold """) - def visualize(self, csv_file, export=False): + def visualise(self, csv_file, export=False): try: - visualizer = Kukarsivisualiser(csv_file) - visualizer.plot_trajectory() - visualizer.plot_joint_positions() - visualizer.plot_force_trends() + visualiser = KukaRSIVisualiser(csv_file) + visualiser.plot_trajectory() + visualiser.plot_joint_positions() + visualiser.plot_force_trends() if export: - visualizer.export_graphs() + visualiser.export_graphs() print(f"✅ Visualisations exported for '{csv_file}'") except Exception as e: print(f"❌ Failed to visualize '{csv_file}': {e}")