updated and fixed safety manager, updated trajectory code.

This commit is contained in:
Adam 2025-04-25 20:57:27 +01:00
parent 2750952e26
commit 82b01cc7e8
5 changed files with 144 additions and 72 deletions

View File

@ -1,66 +1,62 @@
from src.RSIPI.rsi_api import RSIAPI
from src.RSIPI.live_plotter import LivePlotter
import math
import time
import threading
from winreg import REG_CREATED_NEW_KEY
from RSIPI.rsi_api import RSIAPI
def generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100):
trajectory = []
for i in range(steps):
t = 2 * math.pi * loops * (i / steps)
x = radius * math.sin(t)
y = radius * math.sin(t) * math.cos(t)
z = radius * math.sin(2 * t) / 2
trajectory.append({
"X": start_xyz["X"] + x,
"Y": start_xyz["Y"] + y,
"Z": start_xyz["Z"] + z
})
return trajectory
def main():
from src.RSIPI.rsi_api import RSIAPI
api = RSIAPI("RSI_EthernetConfig.xml")
print(api.set_alert_threshold("deviation", 1.0))
print(api.set_alert_threshold("force", 5.0))
print(api.enable_alerts(True))
print("🛜 Starting RSI client...")
api.start_rsi()
time.sleep(2)
api.client.safety_manager.set_limit("RKorr.X", -2.0, 2.0)
print("✅ Limit set for RKorr.X: -2.0 to 2.0")
start_xyz = {"X": 500, "Y": 0, "Z": 500}
trajectory = api.generate_trajectory(
start=start_xyz,
end=generate_figure_8_z(start_xyz, radius=100, loops=1, steps=1)[0], # endpoint unused
steps=100,
space="cartesian",
mode="absolute"
)
# Overwrite with full 8-curve, skipping internal interpolation
trajectory = generate_figure_8_z(start_xyz, radius=100, loops=1, steps=100)
# === Try safe update ===
try:
result = api.update_variable("RKorr.X", 1.0)
print(result)
print("✅ Safe value accepted.")
except Exception as e:
print(f"❌ Safe value was rejected unexpectedly: {e}")
print(f"✅ Generated figure-8 trajectory with {len(trajectory)} steps.")
# === Try unsafe update ===
try:
result = api.update_variable("RKorr.X", 5.0)
print(result)
print("❌ Unsafe value was accepted unexpectedly!")
except Exception as e:
print(f"✅ Correctly blocked unsafe value: {e}")
# 🧠 Start live plotting in a thread (in mode 3d)
def start_plot():
plotter = LivePlotter(api.client, mode="3d", interval=50)
plotter.start()
# === Emergency Stop ===
api.client.safety_manager.emergency_stop()
try:
api.update_variable("RKorr.X", 0.0)
print("❌ Update passed during E-STOP (this is wrong)")
except Exception as e:
print(f"✅ Correctly blocked by E-STOP: {e}")
threading.Thread(target=start_plot, daemon=True).start()
# === Reset E-Stop ===
api.client.safety_manager.reset_stop()
try:
result = api.update_variable("RKorr.X", 0.0)
print(result)
print("✅ Motion resumed after E-STOP reset.")
except Exception as e:
print(f"❌ Unexpected error after E-STOP reset: {e}")
print("🟢 Executing trajectory...")
api.execute_trajectory(trajectory, space="cartesian", rate=0.02)
# === Enable Override ===
api.override_safety(True)
try:
result = api.update_variable("RKorr.X", 999.0)
print(result)
print("✅ Override allowed unsafe motion as expected.")
except Exception as e:
print(f"❌ Override failed: {e}")
# === Check override status ===
print("🛡️ Override status:", "ACTIVE" if api.is_safety_overridden() else "INACTIVE")
time.sleep(len(trajectory) * 0.02 + 2.0)
print("✅ Trajectory execution complete.")
print("🛑 Stopping RSI...")
api.stop_rsi()
print("✅ RSI client stopped.")
if __name__ == "__main__":
main()

View File

@ -13,6 +13,7 @@ from src.RSIPI.static_plotter import StaticPlotter # Make sure this file exists
import os
from src.RSIPI.live_plotter import LivePlotter
from threading import Thread
import asyncio
class RSIAPI:
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""
@ -274,7 +275,7 @@ class RSIAPI:
return f"{alert_type.capitalize()} alert threshold set to {value}"
return "❌ Invalid alert type. Use 'deviation' or 'force'."
## TODO Testing to here.
## TODO Test from here.
def generate_report(self, filename, format_type):
"""Generate a statistical report from movement data."""
@ -356,14 +357,40 @@ class RSIAPI:
except Exception as e:
return f"❌ RSI injection failed: {e}"
## TODO here
@staticmethod
def generate_trajectory(start, end, steps=100, space="cartesian"):
def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False):
"""Generates a linear trajectory (Cartesian or Joint)."""
return generate_trajectory(start, end, steps, space)
return generate_trajectory(start, end, steps, space, mode, include_resets)
import asyncio
def execute_trajectory(self, trajectory, space="cartesian", rate=0.012):
"""Executes a trajectory using live updates."""
return execute_trajectory(self, trajectory, space, rate)
"""
Executes a trajectory intelligently:
- If already inside an asyncio loop -> schedules task in background
- If no loop -> creates one and runs blocking
"""
async def runner():
for idx, point in enumerate(trajectory):
if space == "cartesian":
self.update_cartesian(**point)
elif space == "joint":
self.update_joints(**point)
else:
raise ValueError("space must be 'cartesian' or 'joint'")
print(f"🚀 Step {idx + 1}/{len(trajectory)} sent")
await asyncio.sleep(rate)
try:
loop = asyncio.get_running_loop()
# If inside event loop, schedule runner as background task
asyncio.create_task(runner())
except RuntimeError:
# If no event loop is running, create and run one
asyncio.run(runner())
def queue_trajectory(self, trajectory, space="cartesian", rate=0.012):
"""Adds a trajectory to the internal queue."""

View File

@ -63,7 +63,15 @@ class RSICommandLineInterface:
elif cmd == "speed" and len(parts) == 3:
self.client.adjust_speed(parts[1], parts[2])
elif cmd == "override" and len(parts) == 2:
self.client.override_safety(parts[1])
state = parts[1]
if state in ["on", "true", "1"]:
self.client.override_safety(True)
print("🛡️ Safety override ENABLED.")
elif state in ["off", "false", "0"]:
self.client.override_safety(False)
print("🛡️ Safety override DISABLED.")
else:
print("❌ Usage: override on | off")
elif cmd == "log":
if len(parts) < 1:
print("⚠️ Usage: log start|stop|status")
@ -200,13 +208,15 @@ class RSICommandLineInterface:
elif cmd == "safety-reset":
self.client.safety_manager.reset_stop()
print("✅ Safety: Emergency Stop reset. Motion allowed.")
elif cmd == "safety-status":
sm = self.client.safety_manager
print("🧱 Safety Status: " + ("STOPPED" if sm.is_stopped() else "ACTIVE"))
print("📏 Enforced Limits:")
override_active = self.client.is_safety_overridden()
print("🛡️ Safety System Status:")
print(" 🧱 Emergency Stop:", "ACTIVE 🛑" if sm.is_stopped() else "CLEARED ✅")
print(" 🛡️ Safety Override:", "ENABLED" if override_active else "DISABLED")
print(" 📏 Enforced Limits:")
for var, (lo, hi) in sm.get_limits().items():
print(f" {var}: {lo}{hi}")
print(f" - {var}: {lo}{hi}")
elif cmd == "plot" and len(parts) >= 2:
plot_type = parts[1]
if len(parts) < 3:
@ -305,7 +315,12 @@ Available Commands:
export_movement_data <csvfile> - Export logged motion data to CSV
compare_test_runs <file1> <file2> - Compare 2 test logs (e.g. deviation)
generate_report <input.csv> [out.txt] - Create a movement analysis report
safety-stop - Emergency stop: block motion
safety-reset - Reset emergency stop
safety-status - Show safety and override status
override on/off - Enable or disable safety override
alerts on/off
set_alert_threshold <deviation|force> <value>
""")
def visualize(self, csv_file, export=False):

View File

@ -70,3 +70,41 @@ class SafetyManager:
def is_safety_overridden(self) -> bool:
"""Returns whether safety override is active."""
return self.override
@staticmethod
def check_cartesian_limits(pose):
"""
Check if a Cartesian pose is within general robot limits.
Typical bounds: ±1500 mm in XYZ, ±360° in orientation.
"""
limits = {
"X": (-1500, 1500),
"Y": (-1500, 1500),
"Z": (0, 2000),
"A": (-360, 360),
"B": (-360, 360),
"C": (-360, 360),
}
for key, (lo, hi) in limits.items():
if key in pose and not (lo <= pose[key] <= hi):
return False
return True
@staticmethod
def check_joint_limits(pose):
"""
Check if a joint-space pose is within KUKA limits.
Typical KUKA ranges: A1A6 in defined degrees.
"""
limits = {
"A1": (-185, 185),
"A2": (-185, 185),
"A3": (-185, 185),
"A4": (-350, 350),
"A5": (-130, 130),
"A6": (-350, 350),
}
for key, (lo, hi) in limits.items():
if key in pose and not (lo <= pose[key] <= hi):
return False
return True

View File

@ -1,26 +1,21 @@
from RSIPI.safety_manager import SafetyManager
import time
def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative", include_resets=True):
def generate_trajectory(start, end, steps=100, space="cartesian", mode="absolute", include_resets=False):
"""
Generates a trajectory from start to end across N steps.
Args:
start (dict): Starting pose or joint values.
end (dict): Target pose or joint values.
steps (int): Number of interpolation steps.
space (str): "cartesian" or "joint" (for API dispatch and validation).
mode (str): "relative" or "absolute" movement type.
include_resets (bool): Inserts zero-correction resets between steps in relative mode.
Returns:
list[dict]: List of movement command dictionaries.
- Absolute mode (default): full poses, no resets
- Relative mode: incremental steps, optional resets after each step
"""
if mode not in ["relative", "absolute"]:
raise ValueError("mode must be 'relative' or 'absolute'")
if space not in ["cartesian", "joint"]:
raise ValueError("space must be 'cartesian' or 'joint'")
if mode == "absolute":
include_resets = False # Smart safeguard
axes = start.keys()
trajectory = []
@ -29,6 +24,7 @@ def generate_trajectory(start, end, steps=100, space="cartesian", mode="relative
global enforce_safety
enforce_safety = hasattr(SafetyManager, "check_cartesian_limits") # Enable only if those methods exist
for i in range(1, steps + 1):
point = {}
for axis in axes: