Made cli and api 1:1

This commit is contained in:
Adam 2025-04-25 21:59:27 +01:00
parent 68baf11c93
commit 093adb35cb
2 changed files with 149 additions and 57 deletions

View File

@ -15,8 +15,39 @@ from src.RSIPI.live_plotter import LivePlotter
from threading import Thread
import asyncio
class RSIAPI:
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""
def generate_report(filename, format_type):
def __init__(self, config_file="RSI_EthernetConfig.xml"):
"""Initialize RSIAPI with an RSI client instance."""
self.thread = None
self.config_file = config_file
self.client = None # Delay instantiation
self.graph_process = None # Store graphing process
self.graphing_instance = None
self.graph_thread = None#
self.trajectory_queue = []
self.live_plotter = None
self.live_plot_thread = None
def _ensure_client(self):
"""Ensure RSIClient is initialised before use."""
if self.client is None:
from .rsi_client import RSIClient
self.client = RSIClient(self.config_file)
def start_rsi(self):
self._ensure_client()
self.thread = threading.Thread(target=self.client.start, daemon=True)
self.thread.start()
return "✅ RSI started in background."
def stop_rsi(self):
"""Stop the RSI client."""
self.client.stop()
return "✅ RSI stopped."
def generate_report(filename, format_type):
"""
Generate a statistical report from a CSV log file.
@ -62,39 +93,6 @@ def generate_report(filename, format_type):
return f"✅ Report saved as {output_path}"
class RSIAPI:
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""
def __init__(self, config_file="RSI_EthernetConfig.xml"):
"""Initialize RSIAPI with an RSI client instance."""
self.thread = None
self.config_file = config_file
self.client = None # Delay instantiation
self.graph_process = None # Store graphing process
self.graphing_instance = None
self.graph_thread = None#
self.trajectory_queue = []
self.live_plotter = None
self.live_plot_thread = None
def _ensure_client(self):
"""Ensure RSIClient is initialised before use."""
if self.client is None:
from .rsi_client import RSIClient
self.client = RSIClient(self.config_file)
def start_rsi(self):
self._ensure_client()
self.thread = threading.Thread(target=self.client.start, daemon=True)
self.thread.start()
return "✅ RSI started in background."
def stop_rsi(self):
"""Stop the RSI client."""
self.client.stop()
return "✅ RSI stopped."
def update_variable(self, name, value):
if "." in name:
parent, child = name.split(".", 1)
@ -483,12 +481,24 @@ class RSIAPI:
def update_cartesian(self, **kwargs):
"""
Update Cartesian correction values (RKorr).
Example: update_cartesian(X=10.0, Y=0.0, Z=0.0)
"""
self._ensure_client()
if "RKorr" not in self.client.send_variables:
print("⚠️ Warning: RKorr not configured in send_variables. Skipping Cartesian update.")
return
for axis, value in kwargs.items():
self.update_variable(f"RKorr.{axis}", float(value))
def update_joints(self, **kwargs):
"""
Update joint correction values (AKorr).
"""
self._ensure_client()
if "AKorr" not in self.client.send_variables:
print("⚠️ Warning: AKorr not configured in send_variables. Skipping Joint update.")
return
for axis, value in kwargs.items():
self.update_variable(f"AKorr.{axis}", float(value))
@ -516,3 +526,84 @@ class RSIAPI:
except KeyboardInterrupt:
print("\n🛑 Stopped network watch.")
# --- 🤖 High-level Cartesian and Joint Trajectory Movement ---
def move_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012):
"""
Generate and execute a Cartesian (TCP) movement between two poses.
Args:
start_pose (dict): e.g. {"X":0, "Y":0, "Z":500}
end_pose (dict): e.g. {"X":100, "Y":0, "Z":500}
steps (int): Number of interpolation points.
rate (float): Time between points in seconds.
"""
trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian")
self.execute_trajectory(trajectory, space="cartesian", rate=rate)
def move_joint_trajectory(self, start_joints, end_joints, steps=50, rate=0.4):
"""
Generate and execute a Joint-space movement between two poses.
Args:
start_joints (dict): e.g. {"A1":0, "A2":0, "A3":0, ...}
end_joints (dict): e.g. {"A1":90, "A2":0, "A3":0, ...}
steps (int): Number of interpolation points.
rate (float): Time between points in seconds.
"""
trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint")
self.execute_trajectory(trajectory, space="joint", rate=rate)
def queue_cartesian_trajectory(self, start_pose, end_pose, steps=50, rate=0.012):
"""
Generate and queue a Cartesian movement (no execution).
"""
if not isinstance(start_pose, dict) or not isinstance(end_pose, dict):
raise ValueError("❌ start_pose and end_pose must be dictionaries (e.g., {'X': 0, 'Y': 0, 'Z': 500})")
if steps <= 0:
raise ValueError("❌ Steps must be greater than zero.")
if rate <= 0:
raise ValueError("❌ Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_pose, end_pose, steps=steps, space="cartesian")
self.queue_trajectory(trajectory, "cartesian", rate)
def queue_joint_trajectory(self, start_joints, end_joints, steps=50, rate=0.4):
"""
Generate and queue a Joint-space movement (no execution).
"""
if not isinstance(start_joints, dict) or not isinstance(end_joints, dict):
raise ValueError("❌ start_joints and end_joints must be dictionaries (e.g., {'A1': 0, 'A2': 0})")
if steps <= 0:
raise ValueError("❌ Steps must be greater than zero.")
if rate <= 0:
raise ValueError("❌ Rate must be greater than zero.")
trajectory = self.generate_trajectory(start_joints, end_joints, steps=steps, space="joint")
self.queue_trajectory(trajectory, "joint", rate)
# --- 🛡️ Safety Management ---
def safety_stop(self):
"""Trigger emergency stop."""
self._ensure_client()
self.client.safety_manager.emergency_stop()
def safety_reset(self):
"""Reset emergency stop."""
self._ensure_client()
self.client.safety_manager.reset_stop()
def safety_status(self):
"""Return detailed safety status."""
self._ensure_client()
sm = self.client.safety_manager
return {
"emergency_stop": sm.is_stopped(),
"safety_override": self.is_safety_overridden(),
"limits": sm.get_limits(),
}
def safety_set_limit(self, variable, lower, upper):
"""Set new safety limit bounds for a specific variable."""
self._ensure_client()
self.client.safety_manager.set_limit(variable, float(lower), float(upper))

View File

@ -44,7 +44,8 @@ class RSICommandLineInterface:
group = parts[0]
self.client.show_variables(group)
elif cmd == "ipoc":
self.client.get_ipoc()
ipoc = self.client.get_ipoc()
print(f"🛰 Current IPOC: {ipoc}")
elif cmd == "watch":
duration = float(parts[0]) if parts else None
self.client.watch_network(duration=duration)
@ -279,7 +280,7 @@ class RSICommandLineInterface:
if format_type not in ["csv", "json", "pdf"]:
print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.")
return
generate_report(filename, format_type)
self.client.generate_report(filename, format_type)
print(f"✅ Report generated: {filename}.{format_type}")
@ -323,15 +324,15 @@ Available Commands:
set_alert_threshold <deviation|force> <value>
""")
def visualize(self, csv_file, export=False):
def visualise(self, csv_file, export=False):
try:
visualizer = Kukarsivisualiser(csv_file)
visualizer.plot_trajectory()
visualizer.plot_joint_positions()
visualizer.plot_force_trends()
visualiser = KukaRSIVisualiser(csv_file)
visualiser.plot_trajectory()
visualiser.plot_joint_positions()
visualiser.plot_force_trends()
if export:
visualizer.export_graphs()
visualiser.export_graphs()
print(f"✅ Visualisations exported for '{csv_file}'")
except Exception as e:
print(f"❌ Failed to visualize '{csv_file}': {e}")