diff --git a/src/RSIPI/main.py b/src/RSIPI/main.py index 1da21f4..99621ea 100644 --- a/src/RSIPI/main.py +++ b/src/RSIPI/main.py @@ -1,66 +1,62 @@ +from src.RSIPI.rsi_api import RSIAPI +from src.RSIPI.live_plotter import LivePlotter +import math import time import threading -from winreg import REG_CREATED_NEW_KEY -from RSIPI.rsi_api import RSIAPI + +def generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100): + trajectory = [] + for i in range(steps): + t = 2 * math.pi * loops * (i / steps) + x = radius * math.sin(t) + y = radius * math.sin(t) * math.cos(t) + z = radius * math.sin(2 * t) / 2 + trajectory.append({ + "X": start_xyz["X"] + x, + "Y": start_xyz["Y"] + y, + "Z": start_xyz["Z"] + z + }) + return trajectory + def main(): - from src.RSIPI.rsi_api import RSIAPI - api = RSIAPI("RSI_EthernetConfig.xml") - print(api.set_alert_threshold("deviation", 1.0)) - print(api.set_alert_threshold("force", 5.0)) - print(api.enable_alerts(True)) + print("🛜 Starting RSI client...") + api.start_rsi() + time.sleep(2) - api.client.safety_manager.set_limit("RKorr.X", -2.0, 2.0) - print("✅ Limit set for RKorr.X: -2.0 to 2.0") + start_xyz = {"X": 500, "Y": 0, "Z": 500} + trajectory = api.generate_trajectory( + start=start_xyz, + end=generate_figure_8_z(start_xyz, radius=100, loops=1, steps=1)[0], # endpoint unused + steps=100, + space="cartesian", + mode="absolute" + ) + # Overwrite with full 8-curve, skipping internal interpolation + trajectory = generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100) - # === Try safe update === - try: - result = api.update_variable("RKorr.X", 1.0) - print(result) - print("✅ Safe value accepted.") - except Exception as e: - print(f"❌ Safe value was rejected unexpectedly: {e}") + print(f"✅ Generated figure-8 trajectory with {len(trajectory)} steps.") - # === Try unsafe update === - try: - result = api.update_variable("RKorr.X", 5.0) - print(result) - print("❌ Unsafe value was accepted unexpectedly!") - except Exception as e: - print(f"✅ Correctly blocked unsafe value: {e}") + # 🧠 Start live plotting in a thread (in mode 3d) + def start_plot(): + plotter = LivePlotter(api.client, mode="3d", interval=50) + plotter.start() - # === Emergency Stop === - api.client.safety_manager.emergency_stop() - try: - api.update_variable("RKorr.X", 0.0) - print("❌ Update passed during E-STOP (this is wrong)") - except Exception as e: - print(f"✅ Correctly blocked by E-STOP: {e}") + threading.Thread(target=start_plot, daemon=True).start() - # === Reset E-Stop === - api.client.safety_manager.reset_stop() - try: - result = api.update_variable("RKorr.X", 0.0) - print(result) - print("✅ Motion resumed after E-STOP reset.") - except Exception as e: - print(f"❌ Unexpected error after E-STOP reset: {e}") + print("🟢 Executing trajectory...") + api.execute_trajectory(trajectory, space="cartesian", rate=0.02) - # === Enable Override === - api.override_safety(True) - try: - result = api.update_variable("RKorr.X", 999.0) - print(result) - print("✅ Override allowed unsafe motion as expected.") - except Exception as e: - print(f"❌ Override failed: {e}") - - # === Check override status === - print("🛡️ Override status:", "ACTIVE" if api.is_safety_overridden() else "INACTIVE") + time.sleep(len(trajectory) * 0.02 + 2.0) + print("✅ Trajectory execution complete.") + print("🛑 Stopping RSI...") api.stop_rsi() + print("✅ RSI client stopped.") + + if __name__ == "__main__": main() diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index e37a12b..7e3d063 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -13,6 +13,7 @@ from src.RSIPI.static_plotter import StaticPlotter # Make sure this file exists import os from src.RSIPI.live_plotter import LivePlotter from threading import Thread +import asyncio class RSIAPI: """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" @@ -274,7 +275,7 @@ class RSIAPI: return f"✅ {alert_type.capitalize()} alert threshold set to {value}" return "❌ Invalid alert type. Use 'deviation' or 'force'." - ## TODO Testing to here. + ## TODO Test from here. def generate_report(self, filename, format_type): """Generate a statistical report from movement data.""" @@ -356,14 +357,40 @@ class RSIAPI: except Exception as e: return f"❌ RSI injection failed: {e}" + ## TODO here + @staticmethod - def generate_trajectory(start, end, steps=100, space="cartesian"): + def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False): """Generates a linear trajectory (Cartesian or Joint).""" - return generate_trajectory(start, end, steps, space) + return generate_trajectory(start, end, steps, space, mode, include_resets) + + import asyncio def execute_trajectory(self, trajectory, space="cartesian", rate=0.012): - """Executes a trajectory using live updates.""" - return execute_trajectory(self, trajectory, space, rate) + """ + Executes a trajectory intelligently: + - If already inside an asyncio loop -> schedules task in background + - If no loop -> creates one and runs blocking + """ + + async def runner(): + for idx, point in enumerate(trajectory): + if space == "cartesian": + self.update_cartesian(**point) + elif space == "joint": + self.update_joints(**point) + else: + raise ValueError("space must be 'cartesian' or 'joint'") + print(f"🚀 Step {idx + 1}/{len(trajectory)} sent") + await asyncio.sleep(rate) + + try: + loop = asyncio.get_running_loop() + # If inside event loop, schedule runner as background task + asyncio.create_task(runner()) + except RuntimeError: + # If no event loop is running, create and run one + asyncio.run(runner()) def queue_trajectory(self, trajectory, space="cartesian", rate=0.012): """Adds a trajectory to the internal queue.""" diff --git a/src/RSIPI/rsi_cli.py b/src/RSIPI/rsi_cli.py index ce057b8..371a800 100644 --- a/src/RSIPI/rsi_cli.py +++ b/src/RSIPI/rsi_cli.py @@ -63,7 +63,15 @@ class RSICommandLineInterface: elif cmd == "speed" and len(parts) == 3: self.client.adjust_speed(parts[1], parts[2]) elif cmd == "override" and len(parts) == 2: - self.client.override_safety(parts[1]) + state = parts[1] + if state in ["on", "true", "1"]: + self.client.override_safety(True) + print("🛡️ Safety override ENABLED.") + elif state in ["off", "false", "0"]: + self.client.override_safety(False) + print("🛡️ Safety override DISABLED.") + else: + print("❌ Usage: override on | off") elif cmd == "log": if len(parts) < 1: print("⚠️ Usage: log start|stop|status") @@ -200,13 +208,15 @@ class RSICommandLineInterface: elif cmd == "safety-reset": self.client.safety_manager.reset_stop() print("✅ Safety: Emergency Stop reset. Motion allowed.") - elif cmd == "safety-status": sm = self.client.safety_manager - print("🧱 Safety Status: " + ("STOPPED" if sm.is_stopped() else "ACTIVE")) - print("📏 Enforced Limits:") + override_active = self.client.is_safety_overridden() + print("🛡️ Safety System Status:") + print(" 🧱 Emergency Stop:", "ACTIVE 🛑" if sm.is_stopped() else "CLEARED ✅") + print(" 🛡️ Safety Override:", "ENABLED" if override_active else "DISABLED") + print(" 📏 Enforced Limits:") for var, (lo, hi) in sm.get_limits().items(): - print(f" {var}: {lo} → {hi}") + print(f" - {var}: {lo} → {hi}") elif cmd == "plot" and len(parts) >= 2: plot_type = parts[1] if len(parts) < 3: @@ -305,7 +315,12 @@ Available Commands: export_movement_data - Export logged motion data to CSV compare_test_runs - Compare 2 test logs (e.g. deviation) generate_report [out.txt] - Create a movement analysis report - + safety-stop - Emergency stop: block motion + safety-reset - Reset emergency stop + safety-status - Show safety and override status + override on/off - Enable or disable safety override + alerts on/off + set_alert_threshold """) def visualize(self, csv_file, export=False): diff --git a/src/RSIPI/safety_manager.py b/src/RSIPI/safety_manager.py index 07357b8..71bcc75 100644 --- a/src/RSIPI/safety_manager.py +++ b/src/RSIPI/safety_manager.py @@ -69,4 +69,42 @@ class SafetyManager: def is_safety_overridden(self) -> bool: """Returns whether safety override is active.""" - return self.override \ No newline at end of file + return self.override + + @staticmethod + def check_cartesian_limits(pose): + """ + Check if a Cartesian pose is within general robot limits. + Typical bounds: ±1500 mm in XYZ, ±360° in orientation. + """ + limits = { + "X": (-1500, 1500), + "Y": (-1500, 1500), + "Z": (0, 2000), + "A": (-360, 360), + "B": (-360, 360), + "C": (-360, 360), + } + for key, (lo, hi) in limits.items(): + if key in pose and not (lo <= pose[key] <= hi): + return False + return True + + @staticmethod + def check_joint_limits(pose): + """ + Check if a joint-space pose is within KUKA limits. + Typical KUKA ranges: A1–A6 in defined degrees. + """ + limits = { + "A1": (-185, 185), + "A2": (-185, 185), + "A3": (-185, 185), + "A4": (-350, 350), + "A5": (-130, 130), + "A6": (-350, 350), + } + for key, (lo, hi) in limits.items(): + if key in pose and not (lo <= pose[key] <= hi): + return False + return True \ No newline at end of file diff --git a/src/RSIPI/trajectory_planner.py b/src/RSIPI/trajectory_planner.py index d512a7f..af9cc73 100644 --- a/src/RSIPI/trajectory_planner.py +++ b/src/RSIPI/trajectory_planner.py @@ -1,26 +1,21 @@ from RSIPI.safety_manager import SafetyManager import time -def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative", include_resets=True): +def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False): """ Generates a trajectory from start to end across N steps. - Args: - start (dict): Starting pose or joint values. - end (dict): Target pose or joint values. - steps (int): Number of interpolation steps. - space (str): "cartesian" or "joint" (for API dispatch and validation). - mode (str): "relative" or "absolute" movement type. - include_resets (bool): Inserts zero-correction resets between steps in relative mode. - - Returns: - list[dict]: List of movement command dictionaries. + - Absolute mode (default): full poses, no resets + - Relative mode: incremental steps, optional resets after each step """ if mode not in ["relative", "absolute"]: raise ValueError("mode must be 'relative' or 'absolute'") if space not in ["cartesian", "joint"]: raise ValueError("space must be 'cartesian' or 'joint'") + if mode == "absolute": + include_resets = False # Smart safeguard + axes = start.keys() trajectory = [] @@ -29,6 +24,7 @@ def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative global enforce_safety enforce_safety = hasattr(SafetyManager, "check_cartesian_limits") # Enable only if those methods exist + for i in range(1, steps + 1): point = {} for axis in axes: