Fix send/receive variable inversion and network loop performance

The variable naming follows KUKA convention (robot's perspective) where
send_variables = what the robot sends us (RIst, RSol) and
receive_variables = what the robot receives from us (RKorr, DiO).
All APIs were using them backwards — writing corrections to
send_variables and building response XML from them, meaning the robot
never received actual corrections.

- network_handler: parse incoming XML into send_variables, build
  response XML from receive_variables, use local dict snapshots
  to avoid per-key Manager IPC within the 4ms cycle
- motion_api: check receive_variables for RKorr/AKorr
- tools_api: write user corrections to receive_variables
- monitoring_api: read robot state from send_variables
- io_api: read digital inputs from send_variables
- krl_api: read Tech.T params from send_variables
- rsi_cli/rsi_graphing: add --config arg, remove hardcoded paths
- main.py: test runner with all examples and multiprocessing guard
This commit is contained in:
Adam 2026-04-17 18:54:10 +01:00
parent 11d658730c
commit fbd8dfacdc
10 changed files with 476 additions and 44 deletions

79
CLAUDE.md Normal file
View File

@ -0,0 +1,79 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## What This Project Does
RSIPI enables real-time control of KUKA industrial robots from Python via the RSI (Robot Sensor Interface) protocol. The robot sends its position ~250 times/second over UDP, and this library lets you send back position corrections to control the robot externally.
## Build & Development Commands
```bash
# Install dependencies
pip install -e .
# Or install from requirements (if present)
pip install pandas>=2.0 numpy>=1.22 matplotlib>=3.5 lxml>=4.9 scipy>=1.8
# Run the CLI
python -m RSIPI.rsi_cli --config RSI_EthernetConfig.xml
# Run the echo server (for offline testing without a real robot)
python -m RSIPI.rsi_echo_server
```
**No test suite exists** - testing is done via the echo server simulation and example scripts in `examples/`.
## Architecture
### Core Communication Flow
```
KUKA Robot Controller <--UDP/XML--> NetworkProcess <--multiprocessing.Manager--> RSIClient <-- RSIAPI/CLI
```
1. **NetworkProcess** (`network_handler.py`) - Runs in separate process via `multiprocessing.Process`. Binds to UDP socket, receives XML from robot, parses into `receive_variables`, sends XML from `send_variables` back to robot. Uses `start_event` to wait for explicit start signal.
2. **RSIClient** (`rsi_client.py`) - Orchestrates the system. Initializes ConfigParser, SafetyManager, and NetworkProcess. Uses `multiprocessing.Manager` dicts for thread-safe variable sharing between processes.
3. **RSIAPI** (`rsi_api.py`) - High-level API wrapping RSIClient. Runs RSIClient in a daemon thread. Provides trajectory planning, logging, plotting, and safety controls.
4. **RSICommandLineInterface** (`rsi_cli.py`) - Interactive CLI that wraps RSIAPI.
### Key Shared State
Variables are shared between processes using `multiprocessing.Manager().dict()`:
- `send_variables` - Values to send to robot (RKorr corrections, digital outputs, etc.)
- `receive_variables` - Values received from robot (RIst position, ASPos joints, IPOC timestamp)
### Configuration
`RSI_EthernetConfig.xml` defines:
- Network settings (IP, port) in `<CONFIG>` section
- Send variables in `<SEND><ELEMENTS>` - what the robot receives from us
- Receive variables in `<RECEIVE><ELEMENTS>` - what we receive from robot
Variable tags like `DEF_RIst` get the `DEF_` prefix stripped and are expanded using `internal_structure` in ConfigParser to full dicts (e.g., `RIst: {X, Y, Z, A, B, C}`).
### Safety Layer
**SafetyManager** (`safety_manager.py`) validates all outgoing values against configurable limits. Can load limits from `.rsi.xml` files. Supports emergency stop and safety override modes.
### Trajectory Execution
`TrajectoryPlanner` generates interpolated waypoints. `execute_trajectory()` in RSIAPI uses asyncio to send points at specified rate (default 12ms for Cartesian, 400ms for joints).
## Important Patterns
- **IPOC synchronization**: The robot sends an IPOC (timestamp) value. The response must include `IPOC + 4` to maintain sync. This is handled automatically in `NetworkProcess.process_received_data()`.
- **Lazy client initialization**: RSIAPI uses `_ensure_client()` pattern - RSIClient is created on first use, not at RSIAPI instantiation.
- **Non-blocking start**: `start_rsi()` runs the client loop in a daemon thread. The NetworkProcess waits on `start_event` before binding the socket.
## File Locations
- Source code: `src/RSIPI/`
- Example scripts: `examples/`
- Config template: `RSI_EthernetConfig.xml`
- Logs written to: `logs/` (created at runtime)

297
main.py Normal file
View File

@ -0,0 +1,297 @@
"""
RSIPI Test Runner
=================
Uncomment one example at a time and run this file.
Uses RSI_EthernetConfig.xml from the project root by default.
Usage:
python main.py
python main.py --config path/to/other_config.xml
"""
import argparse
import time
from multiprocessing import freeze_support
from RSIPI import RSIAPI
if __name__ == '__main__':
freeze_support()
parser = argparse.ArgumentParser(description="RSIPI Test Runner")
parser.add_argument("--config", type=str, default="RSI_EthernetConfig.xml",
help="Path to RSI config XML file")
args = parser.parse_args()
api = RSIAPI(args.config)
# =========================================================================
# Example 01: Start / Stop
# =========================================================================
# api.start()
# print("RSI started. Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Example 02: Send Cartesian correction (move TCP 50mm along X)
# =========================================================================
api.start()
print("Waiting for robot connection...")
while not api.is_running():
time.sleep(0.1)
# Wait for first IPOC to confirm packets are flowing
while api.monitoring.get_ipoc() == "N/A" or api.monitoring.get_ipoc() == 0:
time.sleep(0.1)
print(f"Connected. IPOC: {api.monitoring.get_ipoc()}")
api.motion.update_cartesian(X=0.01, Y=0, Z=0)
print("Sent Cartesian correction. Press Enter to stop.")
input()
api.stop()
# =========================================================================
# Example 03: Send Joint correction (move A1 by 10 degrees)
# =========================================================================
# api.start()
# time.sleep(1)
# api.motion.update_joints(A1=10)
# print("Sent joint correction. Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Example 04: External axes (move E1 by 100mm)
# =========================================================================
# api.start()
# time.sleep(1)
# api.motion.move_external_axis('E1', 100)
# print("Sent external axis correction. Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Example 05: Digital I/O (toggle outputs)
# =========================================================================
# api.start()
# time.sleep(1)
# api.io.set_output(1, True)
# api.io.set_output(2, False)
# api.io.set_output(3, True)
# print("Set digital outputs. Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Example 06: CSV Logging
# =========================================================================
# api.logging.start()
# api.start()
# print("Logging to CSV. Press Enter to stop.")
# input()
# api.logging.stop()
# api.stop()
# =========================================================================
# Example 07: Live Graphing
# =========================================================================
# api.start()
# api.viz.start_live_plot('3d')
# print("Live graph running. Press Enter to stop.")
# input()
# api.viz.stop_live_plot()
# api.stop()
# =========================================================================
# Example 08: Safety Limits (restrict X-axis to +/-500mm)
# =========================================================================
# api.safety.set_limit("RKorr.X", -500, 500)
# api.start()
# print("Safety limits active. Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Example 09: Cartesian Trajectory (square pattern)
# =========================================================================
# api.start()
# time.sleep(1)
# waypoints = [
# {"X": 50, "Y": 0, "Z": 0},
# {"X": 50, "Y": 50, "Z": 0},
# {"X": 0, "Y": 50, "Z": 0},
# {"X": 0, "Y": 0, "Z": 0},
# ]
# for wp in waypoints:
# api.motion.update_cartesian(**wp)
# time.sleep(0.5)
# print("Trajectory complete. Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Example 10: Safe Shutdown (Ctrl+C handler)
# =========================================================================
# try:
# api.start()
# print("Running. Press Ctrl+C for safe shutdown.")
# while True:
# time.sleep(0.1)
# except KeyboardInterrupt:
# print("Emergency stop triggered.")
# api.safety.stop()
# api.stop()
# =========================================================================
# Coordination 01: Basic Handshake (KRL <-> Python I/O signalling)
# =========================================================================
# api.start()
# time.sleep(1)
# print("Waiting for KRL ready signal on input 1...")
# if api.krl.wait_for_signal(1, timeout=10.0):
# print("KRL ready. Processing...")
# time.sleep(1)
# api.krl.signal_complete(1)
# print("Handshake complete.")
# else:
# print("Timeout waiting for KRL signal.")
# input("Press Enter to stop.")
# api.stop()
# =========================================================================
# Coordination 02: Parameter Passing (read/write Tech variables)
# =========================================================================
# api.start()
# time.sleep(1)
# pos_x = api.krl.read_param('T11')
# pos_y = api.krl.read_param('T12')
# pos_z = api.krl.read_param('T13')
# print(f"Current position from KRL: X={pos_x}, Y={pos_y}, Z={pos_z}")
# api.krl.write_param('C11', pos_x + 50)
# api.krl.write_param('C12', pos_y)
# api.krl.write_param('C13', pos_z)
# api.krl.signal_complete(1)
# print("Parameters sent. Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Coordination 03: State Machine (multi-state workflow)
# =========================================================================
# IDLE, CALIBRATING, READY, EXECUTING, COMPLETE, ERROR = 0, 1, 2, 3, 4, 5
# api.start()
# time.sleep(1)
# state = IDLE
# print("State machine running. Press Ctrl+C to exit.")
# try:
# while state != COMPLETE:
# krl_state = int(api.krl.read_param('T11'))
# if krl_state == CALIBRATING:
# print("Calibrating...")
# time.sleep(2)
# api.krl.write_param('C11', READY)
# state = READY
# elif krl_state == EXECUTING:
# print("Executing motion...")
# state = EXECUTING
# elif krl_state == COMPLETE:
# print("Complete.")
# state = COMPLETE
# elif krl_state == ERROR:
# print("Error detected!")
# break
# time.sleep(0.05)
# except KeyboardInterrupt:
# pass
# api.stop()
# =========================================================================
# Advanced Motion 01: Velocity Profiles (trapezoidal vs S-curve)
# =========================================================================
# api.start()
# time.sleep(1)
# traj = api.motion.generate_trajectory(
# {"X": 0, "Y": 0, "Z": 0},
# {"X": 100, "Y": 0, "Z": 0},
# steps=50)
# trap_profile = api.motion.generate_velocity_profile(
# traj, max_velocity=50, max_acceleration=100, profile='trapezoidal')
# scurve_profile = api.motion.generate_velocity_profile(
# traj, max_velocity=50, max_acceleration=100, profile='s-curve')
# print(f"Trapezoidal: {len(trap_profile)} points")
# print(f"S-curve: {len(scurve_profile)} points")
# print("Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Advanced Motion 02: Geometric Primitives (arc, circle, spiral)
# =========================================================================
# api.start()
# time.sleep(1)
# arc = api.motion.generate_arc(
# center={"X": 0, "Y": 0, "Z": 0},
# radius=50, start_angle=0, end_angle=90, steps=20)
# circle = api.motion.generate_circle(
# center={"X": 0, "Y": 0, "Z": 0},
# radius=50, steps=36)
# spiral = api.motion.generate_spiral(
# center={"X": 0, "Y": 0, "Z": 0},
# start_radius=10, end_radius=50, pitch=5.0, revolutions=3, steps=60)
# print(f"Arc: {len(arc)} pts, Circle: {len(circle)} pts, Spiral: {len(spiral)} pts")
# print("Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Advanced Motion 03: Path Blending (smooth corner transitions)
# =========================================================================
# api.start()
# time.sleep(1)
# traj1 = [{"X": i, "Y": 0, "Z": 0} for i in range(0, 50, 5)]
# traj2 = [{"X": 50, "Y": i, "Z": 0} for i in range(0, 50, 5)]
# blended = api.motion.blend_trajectories(traj1, traj2, blend_radius=15)
# print(f"Blended trajectory: {len(blended)} points")
# print("Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Advanced Motion 04: Coordinate Transforms (frame conversions)
# =========================================================================
# api.start()
# time.sleep(1)
# base_pos = {"X": 500, "Y": 100, "Z": 800, "A": 0, "B": 0, "C": 0}
# world_pos = api.motion.transform_coordinates(
# base_pos, from_frame="BASE", to_frame="WORLD",
# frame_offset={"X": 100, "Y": 50, "Z": 0})
# print(f"Base: {base_pos}")
# print(f"World: {world_pos}")
# print("Press Enter to stop.")
# input()
# api.stop()
# =========================================================================
# Advanced Motion 05: Combined Motion (production drilling pattern)
# =========================================================================
# api.start()
# time.sleep(1)
# # Generate trajectory to work area
# nav_traj = api.motion.generate_trajectory(
# {"X": 0, "Y": 0, "Z": 0},
# {"X": 200, "Y": 200, "Z": 0},
# steps=30)
# # Apply S-curve velocity profile
# profiled = api.motion.generate_velocity_profile(
# nav_traj, max_velocity=80, max_acceleration=200, profile='s-curve')
# print(f"Navigation: {len(profiled)} profiled points")
# # Spiral inspection pattern
# inspection = api.motion.generate_spiral(
# center={"X": 200, "Y": 200, "Z": 0},
# start_radius=5, end_radius=40, pitch=0, revolutions=2, steps=40)
# # Drilling descent spiral
# drill = api.motion.generate_spiral(
# center={"X": 200, "Y": 200, "Z": 0},
# start_radius=3, end_radius=3, pitch=-5.0, revolutions=5, steps=50)
# print(f"Inspection: {len(inspection)} pts, Drill: {len(drill)} pts")
# print("Press Enter to stop.")
# input()
# api.stop()

View File

@ -135,16 +135,16 @@ class IOAPI:
channel_name = f"i{channel}"
var_name = f"{group}.{channel_name}"
# Navigate nested receive_variables structure
if group in self.client.receive_variables:
group_dict = self.client.receive_variables.get(group, {})
# Digital inputs come from the robot (send_variables = what robot sends us)
if group in self.client.send_variables:
group_dict = self.client.send_variables.get(group, {})
if isinstance(group_dict, dict) and channel_name in group_dict:
value = group_dict[channel_name]
return bool(value)
else:
raise RSIVariableError(f"Input channel '{channel_name}' not found in group '{group}'")
else:
raise RSIVariableError(f"Input group '{group}' not found in receive_variables")
raise RSIVariableError(f"Input group '{group}' not found in send_variables")
def pulse(self, channel: int, duration: float = 0.1, group: str = 'Digout') -> str:
"""

View File

@ -368,15 +368,15 @@ class KRLAPI:
if not (11 <= slot_num <= 199):
raise ValueError(f"Tech slot must be between 11-199, got {slot_num}")
# Read from receive_variables
if 'Tech' in self.client.receive_variables:
tech_dict = self.client.receive_variables.get('Tech', {})
# Tech.T variables are written by KRL and sent to us (send_variables)
if 'Tech' in self.client.send_variables:
tech_dict = self.client.send_variables.get('Tech', {})
var_name = f"T{slot_num}"
if isinstance(tech_dict, dict) and var_name in tech_dict:
value = tech_dict[var_name]
logging.debug(f"Read {value} from Tech.{var_name}")
return float(value)
else:
raise RSIVariableError(f"Tech.{var_name} not found in receive_variables")
raise RSIVariableError(f"Tech.{var_name} not found in send_variables")
else:
raise RSIVariableError("Tech variable group not found in receive_variables")
raise RSIVariableError("Tech variable group not found in send_variables")

View File

@ -48,11 +48,11 @@ class MonitoringAPI:
IPOC: 123456
"""
return {
"position": dict(self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})),
"velocity": dict(self.client.receive_variables.get("Velocity", {"X": 0, "Y": 0, "Z": 0})),
"acceleration": dict(self.client.receive_variables.get("Acceleration", {"X": 0, "Y": 0, "Z": 0})),
"force": dict(self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0})),
"ipoc": self.client.receive_variables.get("IPOC", "N/A")
"position": dict(self.client.send_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0})),
"velocity": dict(self.client.send_variables.get("Velocity", {"X": 0, "Y": 0, "Z": 0})),
"acceleration": dict(self.client.send_variables.get("Acceleration", {"X": 0, "Y": 0, "Z": 0})),
"force": dict(self.client.send_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0})),
"ipoc": self.client.send_variables.get("IPOC", "N/A")
}
def get_live_data_as_numpy(self) -> np.ndarray:
@ -118,7 +118,7 @@ class MonitoringAPI:
>>> print(ipoc)
123456
"""
return self.client.receive_variables.get("IPOC", "N/A")
return self.client.send_variables.get("IPOC", "N/A")
def get_position(self) -> Dict[str, float]:
"""
@ -132,7 +132,7 @@ class MonitoringAPI:
>>> print(f"TCP at X={pos['X']}, Y={pos['Y']}, Z={pos['Z']}")
TCP at X=600.5, Y=-200.3, Z=1450.8
"""
return dict(self.client.receive_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0, "A": 0, "B": 0, "C": 0}))
return dict(self.client.send_variables.get("RIst", {"X": 0, "Y": 0, "Z": 0, "A": 0, "B": 0, "C": 0}))
def get_force(self) -> Dict[str, float]:
"""
@ -149,7 +149,7 @@ class MonitoringAPI:
>>> print(f"Joint A1 current: {force['A1']}")
Joint A1 current: 12.5
"""
return dict(self.client.receive_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}))
return dict(self.client.send_variables.get("MaCur", {"A1": 0, "A2": 0, "A3": 0, "A4": 0, "A5": 0, "A6": 0}))
def watch_network(self, duration: Optional[float] = None, rate: float = 0.2) -> None:
"""

View File

@ -58,8 +58,8 @@ class MotionAPI:
RKorr must be configured in the RSI config file and enabled in KRL
using RSI_MOVECORR() for corrections to take effect.
"""
if "RKorr" not in self.client.send_variables:
logging.warning("RKorr not configured in send_variables. Skipping Cartesian update.")
if "RKorr" not in self.client.receive_variables:
logging.warning("RKorr not configured in receive_variables. Skipping Cartesian update.")
return
# Import here to avoid circular dependency
@ -95,8 +95,8 @@ class MotionAPI:
AKorr must be configured in the RSI config file and enabled in KRL
using RSI_MOVECORR() for corrections to take effect.
"""
if "AKorr" not in self.client.send_variables:
logging.warning("AKorr not configured in send_variables. Skipping Joint update.")
if "AKorr" not in self.client.receive_variables:
logging.warning("AKorr not configured in receive_variables. Skipping Joint update.")
return
from .tools_api import ToolsAPI

View File

@ -182,8 +182,23 @@ class NetworkProcess(multiprocessing.Process):
Receives UDP messages from robot, processes them, sends responses,
and optionally logs data to CSV. Records timing metrics if enabled.
Uses local dict snapshots to avoid per-key IPC overhead on
multiprocessing.Manager dicts within the 4ms cycle.
"""
update_counter = 0 # For periodic metrics updates
metrics_counter = 0 # For periodic receive_variables sync
# Variable naming follows KUKA convention (robot's perspective):
# send_variables = what the robot SENDS to us (RIst, RSol, IPOC, etc.)
# receive_variables = what the robot RECEIVES from us (RKorr, DiO, EStr, etc.)
#
# So: parse incoming XML → send_variables, build response XML ← receive_variables
# Local working copies — avoid Manager IPC in the hot path
local_robot_out = dict(self.send_variables) # robot's outgoing data (we read)
local_robot_in = dict(self.receive_variables) # robot's incoming data (we write)
network_settings = self.config_parser.network_settings
while not self.stop_event.is_set():
# Check for commands (non-blocking)
@ -193,14 +208,32 @@ class NetworkProcess(multiprocessing.Process):
self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
message = data_received.decode()
self.process_received_data(message)
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
# Parse robot's outgoing data into local dict (no IPC)
self._parse_received_data(message, local_robot_out)
# Snapshot receive_variables to pick up user changes (single IPC call)
local_robot_in = dict(self.receive_variables)
# Sync IPOC: robot sends it, we echo back IPOC+4
if "IPOC" in local_robot_out:
ipoc = local_robot_out["IPOC"]
local_robot_in["IPOC"] = ipoc + 4
self.receive_variables["IPOC"] = ipoc + 4
# Build and send response XML from our corrections (no IPC)
send_xml = XMLGenerator.generate_send_xml(local_robot_in, network_settings)
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)
# Sync robot's outgoing data → Manager dict periodically (every 10 cycles ~40ms)
metrics_counter += 1
if metrics_counter >= 10:
self.send_variables.update(local_robot_out)
metrics_counter = 0
# Record timing metrics (Phase 2)
if self.timing_metrics is not None:
ipoc = self.receive_variables.get("IPOC", 0)
self.timing_metrics.record_cycle(ipoc)
self.timing_metrics.record_cycle(local_robot_out.get("IPOC", 0))
# Update shared metrics dict every 100 cycles (~400ms)
update_counter += 1
@ -365,14 +398,14 @@ class NetworkProcess(multiprocessing.Process):
except (socket.error, OSError):
return False
def process_received_data(self, xml_string: str) -> None:
@staticmethod
def _parse_received_data(xml_string: str, target: dict) -> None:
"""
Parse received XML message and update receive_variables.
Handles IPOC synchronization by echoing back IPOC+4.
Parse received XML message into a local dict (no IPC).
Args:
xml_string: XML message string from robot controller
target: Plain dict to update with parsed values
Raises:
RSIPacketError: If XML parsing fails
@ -380,18 +413,30 @@ class NetworkProcess(multiprocessing.Process):
try:
root = ET.fromstring(xml_string)
for element in root:
if element.tag in self.receive_variables:
if element.tag in target:
if len(element.attrib) > 0:
self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()}
target[element.tag] = {k: float(v) for k, v in element.attrib.items()}
else:
self.receive_variables[element.tag] = element.text
target[element.tag] = element.text
if element.tag == "IPOC":
received_ipoc = int(element.text)
self.receive_variables["IPOC"] = received_ipoc
self.send_variables["IPOC"] = received_ipoc + 4
target["IPOC"] = int(element.text)
except ET.ParseError as e:
logging.error(f"XML parse error in received message: {e}")
raise RSIPacketError(f"Failed to parse received XML: {e}") from e
except Exception as e:
logging.error(f"Error processing received message: {e}")
raise RSIPacketError(f"Unexpected error parsing packet: {e}") from e
def process_received_data(self, xml_string: str) -> None:
"""
Parse received XML message and update send_variables (Manager dict).
Legacy method kept for compatibility (e.g. echo server). The hot loop
uses _parse_received_data() with local dicts instead.
Args:
xml_string: XML message string from robot controller
"""
self._parse_received_data(xml_string, self.send_variables)
if "IPOC" in self.send_variables:
self.receive_variables["IPOC"] = self.send_variables["IPOC"] + 4

View File

@ -192,5 +192,11 @@ Available Commands:
""")
if __name__ == "__main__":
cli = RSICommandLineInterface("../../examples/RSI_EthernetConfig.xml")
import argparse
parser = argparse.ArgumentParser(description="RSI Command-Line Interface")
parser.add_argument("--config", type=str, default="RSI_EthernetConfig.xml",
help="Path to RSI config XML file (default: RSI_EthernetConfig.xml)")
args = parser.parse_args()
cli = RSICommandLineInterface(args.config)
cli.run()

View File

@ -183,10 +183,12 @@ if __name__ == "__main__":
parser.add_argument("--mode", choices=["position", "velocity", "acceleration", "force"], default="position", help="Graphing mode")
parser.add_argument("--overlay", action="store_true", help="Enable planned vs. actual overlay")
parser.add_argument("--plan", type=str, help="CSV file with planned trajectory")
parser.add_argument("--config", type=str, default="RSI_EthernetConfig.xml",
help="Path to RSI config XML file (default: RSI_EthernetConfig.xml)")
parser.add_argument("--alerts", action="store_true", help="Enable real-time alerts")
args = parser.parse_args()
client = RSIClient("../../examples/RSI_EthernetConfig.xml")
client = RSIClient(args.config)
graphing = RSIGraphing(client, mode=args.mode, overlay=args.overlay, plan_file=args.plan)
if not args.alerts:

View File

@ -59,24 +59,27 @@ class ToolsAPI:
"""
from .exceptions import RSIVariableError
# receive_variables = what the robot receives from us (RKorr, AKorr, DiO, Tech.C, etc.)
# send_variables = what the robot sends to us (RIst, RSol, IPOC, etc.)
# User corrections go into receive_variables.
target = self.client.receive_variables
if "." in name:
parent, child = name.split(".", 1)
full_path = f"{parent}.{child}"
if parent in self.client.send_variables:
current = dict(self.client.send_variables[parent])
# Validate using SafetyManager
if parent in target:
current = dict(target[parent])
safe_value = self.client.safety_manager.validate(full_path, float(value))
current[child] = safe_value
self.client.send_variables[parent] = current
target[parent] = current
logging.debug(f"Updated {name} to {safe_value}")
return f"Updated {name} to {safe_value}"
else:
raise RSIVariableError(f"Parent variable '{parent}' not found in send_variables")
raise RSIVariableError(f"Parent variable '{parent}' not found in receive_variables")
else:
# Top-level variable
safe_value = self.client.safety_manager.validate(name, float(value))
self.client.send_variables[name] = safe_value
target[name] = safe_value
logging.debug(f"Updated {name} to {safe_value}")
return f"Updated {name} to {safe_value}"