From 6e0b87b9455831cc8d3eb4e37d8f444301e03613 Mon Sep 17 00:00:00 2001 From: Adam Date: Sat, 17 Jan 2026 00:38:32 +0000 Subject: [PATCH] Implement Phase 3: KRL Coordination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Complete implementation of Python-KRL coordination features for seamless bidirectional communication between RSIPI and KUKA KRL programs. ## IOAPI Enhancements Added high-level I/O control methods for convenient digital I/O manipulation: - **set_output(channel, value, group='Digout')** - Set digital output by channel number - **get_input(channel, group='Digin')** - Read digital input by channel number - **pulse(channel, duration=0.1, group='Digout')** - Generate timed pulse on output Benefits: - Simpler channel-based addressing (channel 1 instead of 'Digout.o1') - Automatic channel name formatting - Built-in pulse generation for pneumatic actuators and signaling - Consistent error handling ## KRLAPI Enhancements Added coordination helper methods for Python-KRL synchronization: - **wait_for_signal(channel, timeout=5.0)** - Block until KRL sets I/O signal - **signal_complete(channel)** - Signal KRL that Python operation is complete - **write_param(slot, value)** - Write to Tech.C variables (Python → KRL) - **read_param(slot)** - Read from Tech.T variables (KRL → Python) Features: - Configurable timeouts with proper error handling - Flexible slot addressing (11, 'C11', 'c11' all work) - Slot validation (enforces 11-199 range) - Comprehensive logging for debugging - Clear docstrings with KRL code examples ## KRL Template Library Created comprehensive KRL templates demonstrating coordination patterns: **templates/krl/basic_handshake.src** - Simple I/O handshaking (KRL signals → Python waits → Python signals back) - Timeout handling and error recovery - Complete Python code examples in comments **templates/krl/parameter_passing.src** - Bidirectional Tech variable communication - KRL writes position to Tech.T, Python reads - Python calculates target, writes to Tech.C, KRL reads - Demonstrates full parameter exchange workflow **templates/krl/state_machine.src** - Multi-state coordination workflow - States: IDLE, CALIBRATING, READY, EXECUTING, COMPLETE, ERROR - Combines I/O signals and Tech variables - Error handling and timeout mechanisms - Demonstrates complex production-ready pattern **templates/krl/README.md** - Comprehensive coordination patterns documentation - Tech variable mapping conventions (C vs T variables) - I/O signal mapping standards - Timing best practices - Troubleshooting guide ## Python Coordination Examples Created production-ready Python examples demonstrating all coordination methods: **examples/coordination/01_basic_handshake.py** - Simple I/O handshake demonstration - Matches basic_handshake.src template - Command-line interface with argparse - Comprehensive logging and error handling **examples/coordination/02_parameter_passing.py** - Parameter exchange workflow - Reads position from KRL (Tech.T) - Calculates target position - Writes target to KRL (Tech.C) - Matches parameter_passing.src template **examples/coordination/03_state_machine.py** - Complex multi-state coordination - State monitoring loop with enum - Calibration routine with offset calculation - Error detection and signaling - Matches state_machine.src template **examples/coordination/README.md** - Complete usage instructions - Configuration requirements - Troubleshooting section - Customization examples - Advanced usage patterns ## Modified Files src/RSIPI/io_api.py: - Added time import - Implemented set_output() method - Implemented get_input() method with navigation of receive_variables - Implemented pulse() method with blocking time.sleep() - Comprehensive docstrings with examples src/RSIPI/krl_api.py: - Added time import - Implemented wait_for_signal() with configurable polling - Implemented signal_complete() method - Implemented write_param() with slot normalization and validation - Implemented read_param() with slot normalization and validation - KRL code examples in all docstrings ## New Directories templates/krl/ - 3 KRL program templates - Comprehensive README with patterns and conventions examples/coordination/ - 3 Python example scripts - Complete usage documentation ## Design Decisions **I/O Channel Numbering**: 1-based to match KUKA conventions **Tech Variable Slots**: Validated 11-199 range (KUKA reserves 1-10) **Blocking Operations**: wait_for_signal() and pulse() block with configurable timeouts **Error Handling**: Proper exceptions with clear messages **Logging**: Debug/Info/Warning levels for all operations **Documentation**: Every method includes KRL code examples ## Phase 3 Status: ✅ COMPLETE All planned features implemented: - ✅ High-level Digital I/O API - ✅ KRL state coordination helpers - ✅ Parameter passing via Tech variables - ✅ KRL code templates - ✅ Python coordination examples - ✅ Comprehensive documentation Next: Phase 4 (Advanced Motion Control) --- examples/coordination/01_basic_handshake.py | 113 +++++ examples/coordination/02_parameter_passing.py | 133 ++++++ examples/coordination/03_state_machine.py | 208 +++++++++ examples/coordination/README.md | 345 +++++++++++++++ src/RSIPI/io_api.py | 155 ++++++- src/RSIPI/krl_api.py | 283 ++++++++++++- templates/krl/README.md | 393 ++++++++++++++++++ templates/krl/basic_handshake.src | 80 ++++ templates/krl/parameter_passing.src | 125 ++++++ templates/krl/state_machine.src | 189 +++++++++ 10 files changed, 1994 insertions(+), 30 deletions(-) create mode 100644 examples/coordination/01_basic_handshake.py create mode 100644 examples/coordination/02_parameter_passing.py create mode 100644 examples/coordination/03_state_machine.py create mode 100644 examples/coordination/README.md create mode 100644 templates/krl/README.md create mode 100644 templates/krl/basic_handshake.src create mode 100644 templates/krl/parameter_passing.src create mode 100644 templates/krl/state_machine.src diff --git a/examples/coordination/01_basic_handshake.py b/examples/coordination/01_basic_handshake.py new file mode 100644 index 0000000..efcf2e9 --- /dev/null +++ b/examples/coordination/01_basic_handshake.py @@ -0,0 +1,113 @@ +""" +Basic I/O Handshake Example + +Demonstrates simple bidirectional signaling between Python and KRL using +digital I/O channels. Works with templates/krl/basic_handshake.src + +Flow: +1. KRL signals "ready" on output 1 +2. Python waits for signal +3. Python performs processing +4. Python signals "complete" on input 1 +5. KRL continues + +Usage: + python 01_basic_handshake.py --config RSI_EthernetConfig.xml +""" + +import argparse +import time +import logging +from RSIPI import RSIAPI + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) + + +def basic_handshake_example(config_file: str) -> None: + """ + Execute basic I/O handshake with KRL program. + + Args: + config_file: Path to RSI configuration XML file + """ + # Initialize API + api = RSIAPI(config_file) + + try: + # Start RSI communication + logging.info("Starting RSI communication...") + api.start() + logging.info("✅ RSI started successfully") + + # Wait for KRL to signal ready (digital output 1) + logging.info("Waiting for KRL ready signal...") + + if api.krl.wait_for_signal(1, timeout=30.0): + logging.info("✅ KRL signaled ready!") + + # Simulate Python processing (e.g., data analysis, sensor reading) + logging.info("Performing Python-side processing...") + time.sleep(2.0) # Simulated processing time + + # Optional: Do actual work here + # process_sensor_data() + # calculate_corrections() + # update_database() + + logging.info("✅ Processing complete") + + # Signal completion back to KRL (digital input 1) + api.krl.signal_complete(1) + logging.info("✅ Signaled KRL to continue") + + # KRL will now proceed with its motion program + logging.info("KRL is now free to continue motion") + + else: + logging.error("❌ Timeout waiting for KRL ready signal") + logging.error("Check that KRL program is running and I/O is configured correctly") + + except KeyboardInterrupt: + logging.warning("\n⚠️ Interrupted by user") + + except Exception as e: + logging.error(f"❌ Error during handshake: {e}") + + finally: + # Clean shutdown + logging.info("Stopping RSI communication...") + api.stop() + logging.info("✅ API stopped successfully") + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser(description='Basic I/O Handshake Example') + parser.add_argument( + '--config', + type=str, + default='RSI_EthernetConfig.xml', + help='Path to RSI configuration file' + ) + + args = parser.parse_args() + + logging.info("=" * 60) + logging.info("RSIPI - Basic I/O Handshake Example") + logging.info("=" * 60) + logging.info(f"Config: {args.config}") + logging.info("=" * 60) + + basic_handshake_example(args.config) + + logging.info("=" * 60) + logging.info("Example complete!") + logging.info("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/examples/coordination/02_parameter_passing.py b/examples/coordination/02_parameter_passing.py new file mode 100644 index 0000000..1000488 --- /dev/null +++ b/examples/coordination/02_parameter_passing.py @@ -0,0 +1,133 @@ +""" +Parameter Passing Example + +Demonstrates bidirectional numerical data exchange between Python and KRL +using RSI Tech variables. Works with templates/krl/parameter_passing.src + +Flow: +1. KRL writes current position to Tech.T11-T16 +2. Python waits for data ready signal +3. Python reads position from Tech.T +4. Python calculates target and writes to Tech.C11-C13 +5. Python signals completion +6. KRL reads target from Tech.C and executes motion + +Usage: + python 02_parameter_passing.py --config RSI_EthernetConfig.xml +""" + +import argparse +import logging +from RSIPI import RSIAPI + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) + + +def parameter_passing_example(config_file: str) -> None: + """ + Execute parameter passing coordination with KRL program. + + Reads current position from KRL, calculates target position, + and sends target back to KRL for execution. + + Args: + config_file: Path to RSI configuration XML file + """ + api = RSIAPI(config_file) + + try: + logging.info("Starting RSI communication...") + api.start() + logging.info("✅ RSI started successfully") + + # Wait for KRL to signal that data is ready + logging.info("Waiting for KRL data ready signal...") + + if api.krl.wait_for_signal(1, timeout=30.0): + logging.info("✅ KRL signaled data ready!") + + # Read current position from Tech.T variables + logging.info("Reading current position from KRL...") + current_x = api.krl.read_param('T11') + current_y = api.krl.read_param('T12') + current_z = api.krl.read_param('T13') + current_a = api.krl.read_param('T14') + current_b = api.krl.read_param('T15') + current_c = api.krl.read_param('T16') + + logging.info(f"Current position:") + logging.info(f" X: {current_x:.2f} mm") + logging.info(f" Y: {current_y:.2f} mm") + logging.info(f" Z: {current_z:.2f} mm") + logging.info(f" A: {current_a:.2f}°, B: {current_b:.2f}°, C: {current_c:.2f}°") + + # Calculate target position (example: move 100mm in X, 50mm in Y) + logging.info("Calculating target position...") + target_x = current_x + 100.0 # Move 100mm in X + target_y = current_y + 50.0 # Move 50mm in Y + target_z = current_z + 0.0 # Keep Z constant + + logging.info(f"Calculated target:") + logging.info(f" X: {target_x:.2f} mm (+100mm)") + logging.info(f" Y: {target_y:.2f} mm (+50mm)") + logging.info(f" Z: {target_z:.2f} mm (no change)") + + # Write target position to Tech.C variables for KRL to read + logging.info("Writing target position to KRL...") + api.krl.write_param('C11', target_x) + api.krl.write_param('C12', target_y) + api.krl.write_param('C13', target_z) + logging.info("✅ Target position written to Tech.C") + + # Signal KRL that calculation is complete + api.krl.signal_complete(1) + logging.info("✅ Signaled KRL that target is ready") + + # KRL will now read target and execute motion + logging.info("KRL will now execute motion to calculated target") + + else: + logging.error("❌ Timeout waiting for KRL data ready signal") + + except KeyboardInterrupt: + logging.warning("\n⚠️ Interrupted by user") + + except Exception as e: + logging.error(f"❌ Error during parameter passing: {e}") + + finally: + logging.info("Stopping RSI communication...") + api.stop() + logging.info("✅ API stopped successfully") + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser(description='Parameter Passing Example') + parser.add_argument( + '--config', + type=str, + default='RSI_EthernetConfig.xml', + help='Path to RSI configuration file' + ) + + args = parser.parse_args() + + logging.info("=" * 60) + logging.info("RSIPI - Parameter Passing Example") + logging.info("=" * 60) + logging.info(f"Config: {args.config}") + logging.info("=" * 60) + + parameter_passing_example(args.config) + + logging.info("=" * 60) + logging.info("Example complete!") + logging.info("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/examples/coordination/03_state_machine.py b/examples/coordination/03_state_machine.py new file mode 100644 index 0000000..360639c --- /dev/null +++ b/examples/coordination/03_state_machine.py @@ -0,0 +1,208 @@ +""" +State Machine Coordination Example + +Demonstrates a multi-state coordination workflow between Python and KRL. +Works with templates/krl/state_machine.src + +States: + 0: IDLE - Waiting to start + 1: CALIBRATING - Python performing calibration + 2: READY - Calibration complete, ready for motion + 3: EXECUTING - Robot executing motion task + 4: COMPLETE - Task finished + 9: ERROR - Error condition + +Usage: + python 03_state_machine.py --config RSI_EthernetConfig.xml +""" + +import argparse +import time +import logging +from enum import IntEnum +from RSIPI import RSIAPI + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) + + +class State(IntEnum): + """State machine states matching KRL program.""" + IDLE = 0 + CALIBRATING = 1 + READY = 2 + EXECUTING = 3 + COMPLETE = 4 + ERROR = 9 + + +def perform_calibration() -> tuple[float, float, float]: + """ + Perform calibration routine. + + In a real application, this would: + - Read sensor data + - Analyze calibration target + - Calculate position offsets + - Validate results + + Returns: + Tuple of (offset_x, offset_y, offset_z) in mm + """ + logging.info("Performing calibration routine...") + + # Simulate calibration processing + time.sleep(2.0) + + # Example calibration results (in real app, calculated from sensors) + offset_x = 5.0 + offset_y = -2.0 + offset_z = 0.5 + + logging.info(f"Calibration complete:") + logging.info(f" Offset X: {offset_x:+.2f} mm") + logging.info(f" Offset Y: {offset_y:+.2f} mm") + logging.info(f" Offset Z: {offset_z:+.2f} mm") + + return (offset_x, offset_y, offset_z) + + +def state_machine_example(config_file: str) -> None: + """ + Execute state machine coordination with KRL program. + + Monitors KRL state transitions and responds appropriately + to each state change. + + Args: + config_file: Path to RSI configuration XML file + """ + api = RSIAPI(config_file) + + try: + logging.info("Starting RSI communication...") + api.start() + logging.info("✅ RSI started successfully") + + # Main state monitoring loop + logging.info("Monitoring state machine...") + last_state = None + calibration_offsets = (0.0, 0.0, 0.0) + + while True: + # Read current state from KRL + current_state = int(api.krl.read_param('T11')) + + # Only log state changes + if current_state != last_state: + logging.info(f"State changed: {State(last_state or 0).name} → {State(current_state).name}") + last_state = current_state + + # Handle each state + if current_state == State.IDLE: + # Waiting for KRL to start + logging.debug("Waiting in IDLE state...") + time.sleep(0.5) + + elif current_state == State.CALIBRATING: + logging.info("✅ State: CALIBRATING - Starting calibration") + + # Perform calibration + calibration_offsets = perform_calibration() + + # Write calibration results to Tech.C for KRL + logging.info("Writing calibration offsets to KRL...") + api.krl.write_param('C12', calibration_offsets[0]) # X offset + api.krl.write_param('C13', calibration_offsets[1]) # Y offset + api.krl.write_param('C14', calibration_offsets[2]) # Z offset + + # Signal calibration complete + api.krl.signal_complete(1) + logging.info("✅ Calibration complete, signaled KRL") + + elif current_state == State.READY: + logging.info("✅ State: READY - System ready for motion") + # KRL is ready to execute, no action needed from Python + time.sleep(0.1) + + elif current_state == State.EXECUTING: + logging.info("✅ State: EXECUTING - Robot in motion") + + # Monitor execution (could send real-time corrections here) + # Example: Send RSI corrections based on sensor feedback + # api.motion.update_cartesian(X=calibration_offsets[0]) + + # For this example, just monitor + time.sleep(0.1) + + elif current_state == State.COMPLETE: + logging.info("✅ State: COMPLETE - Task finished successfully!") + logging.info("State machine workflow complete") + break # Exit loop, task is done + + elif current_state == State.ERROR: + logging.error("❌ State: ERROR - Error detected in KRL program!") + logging.error("Aborting state machine workflow") + break # Exit loop, error occurred + + else: + logging.warning(f"⚠️ Unknown state: {current_state}") + time.sleep(0.1) + + # Prevent tight loop + time.sleep(0.05) # Check state every 50ms + + logging.info("State machine monitoring ended") + + except KeyboardInterrupt: + logging.warning("\n⚠️ Interrupted by user") + # Signal error to KRL + try: + api.io.set_output(2, True) # Error signal + logging.info("Signaled error to KRL") + except: + pass + + except Exception as e: + logging.error(f"❌ Error during state machine: {e}") + # Signal error to KRL + try: + api.io.set_output(2, True) + except: + pass + + finally: + logging.info("Stopping RSI communication...") + api.stop() + logging.info("✅ API stopped successfully") + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser(description='State Machine Coordination Example') + parser.add_argument( + '--config', + type=str, + default='RSI_EthernetConfig.xml', + help='Path to RSI configuration file' + ) + + args = parser.parse_args() + + logging.info("=" * 60) + logging.info("RSIPI - State Machine Coordination Example") + logging.info("=" * 60) + logging.info(f"Config: {args.config}") + logging.info("=" * 60) + + state_machine_example(args.config) + + logging.info("=" * 60) + logging.info("Example complete!") + logging.info("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/examples/coordination/README.md b/examples/coordination/README.md new file mode 100644 index 0000000..840785f --- /dev/null +++ b/examples/coordination/README.md @@ -0,0 +1,345 @@ +# Python-KRL Coordination Examples + +This directory contains Python examples demonstrating Python-KRL coordination patterns using RSIPI Phase 3 features. + +## Prerequisites + +- RSIPI library installed (`pip install -e .` from rsi-pi directory) +- KUKA robot controller with RSI 3.3 configured +- RSI_EthernetConfig.xml configured with required I/O and Tech variables +- Corresponding KRL programs uploaded to robot controller (see `templates/krl/`) + +## Examples + +### 01_basic_handshake.py +**Simple I/O handshaking** + +Demonstrates basic bidirectional signaling using digital I/O channels. + +**Requires**: `templates/krl/basic_handshake.src` running on robot + +**Run**: +```bash +python 01_basic_handshake.py --config path/to/RSI_EthernetConfig.xml +``` + +**Flow**: +1. Start Python script +2. Execute KRL program on teach pendant +3. Python waits for KRL ready signal +4. Python performs processing +5. Python signals completion +6. KRL continues with motion + +**API Features Demonstrated**: +- `api.krl.wait_for_signal(channel, timeout)` +- `api.krl.signal_complete(channel)` + +--- + +### 02_parameter_passing.py +**Bidirectional parameter exchange** + +Demonstrates numerical data exchange using RSI Tech variables. + +**Requires**: `templates/krl/parameter_passing.src` running on robot + +**Run**: +```bash +python 02_parameter_passing.py --config path/to/RSI_EthernetConfig.xml +``` + +**Flow**: +1. KRL writes current position to Tech.T variables +2. Python reads position data +3. Python calculates target position +4. Python writes target to Tech.C variables +5. Python signals completion +6. KRL reads target and executes motion + +**API Features Demonstrated**: +- `api.krl.read_param(slot)` - Read Tech.T variables +- `api.krl.write_param(slot, value)` - Write Tech.C variables +- `api.krl.wait_for_signal(channel, timeout)` +- `api.krl.signal_complete(channel)` + +--- + +### 03_state_machine.py +**Multi-state workflow coordination** + +Demonstrates complex state machine with error handling and calibration. + +**Requires**: `templates/krl/state_machine.src` running on robot + +**Run**: +```bash +python 03_state_machine.py --config path/to/RSI_EthernetConfig.xml +``` + +**Flow**: +1. Python monitors state variable (Tech.T11) +2. Calibration state: Python performs calibration routine +3. Python writes calibration offsets to Tech.C +4. Executing state: KRL uses offsets for motion +5. Complete state: Workflow finishes successfully + +**States**: +- 0: IDLE - Waiting to start +- 1: CALIBRATING - Python calibration in progress +- 2: READY - Ready for motion +- 3: EXECUTING - Robot in motion +- 4: COMPLETE - Task finished +- 9: ERROR - Error condition + +**API Features Demonstrated**: +- All coordination methods from examples 01 and 02 +- State monitoring loop +- Error handling with I/O signals +- Real-time state transitions + +--- + +## Configuration Requirements + +### RSI XML Configuration + +Your `RSI_EthernetConfig.xml` must include the following elements: + +**Digital I/O:** +```xml + + + + + + + + + + +``` + +**Tech Variables:** +```xml + + + + + + + + + + +``` + +### Network Settings + +Ensure your RSI network settings match: +```xml +192.168.1.100 +49152 +ImFree +``` + +## Running Examples + +### Step 1: Start Python Script + +```bash +# In one terminal +cd examples/coordination +python 01_basic_handshake.py --config ../../RSI_EthernetConfig.xml +``` + +### Step 2: Execute KRL Program + +1. Upload corresponding KRL program to robot controller +2. Switch to AUTO mode on teach pendant +3. Select and execute the KRL program +4. Monitor coordination in Python terminal + +### Step 3: Monitor Output + +Python will log: +- State transitions +- I/O signal changes +- Parameter reads/writes +- Errors and warnings + +**Example Output**: +``` +2026-01-17 14:32:01 - INFO - Starting RSI communication... +2026-01-17 14:32:01 - INFO - ✅ RSI started successfully +2026-01-17 14:32:01 - INFO - Waiting for KRL ready signal... +2026-01-17 14:32:05 - INFO - ✅ KRL signaled ready! +2026-01-17 14:32:05 - INFO - Performing Python-side processing... +2026-01-17 14:32:07 - INFO - ✅ Processing complete +2026-01-17 14:32:07 - INFO - ✅ Signaled KRL to continue +``` + +## Customizing Examples + +### Modify Processing Logic + +In `01_basic_handshake.py`: +```python +# Replace simulated processing +time.sleep(2.0) + +# With actual processing +sensor_data = read_sensor() +processed_result = analyze_data(sensor_data) +update_database(processed_result) +``` + +### Change Calculation Logic + +In `02_parameter_passing.py`: +```python +# Modify target calculation +target_x = current_x + 100.0 # Original +target_x = calculate_adaptive_target(current_x, sensor_feedback) # Custom +``` + +### Extend State Machine + +In `03_state_machine.py`: +```python +# Add new states +class State(IntEnum): + IDLE = 0 + CALIBRATING = 1 + INSPECTING = 2 # NEW STATE + READY = 3 # Renumber subsequent states + # ... + +# Handle new state +elif current_state == State.INSPECTING: + inspection_result = perform_inspection() + api.krl.write_param('C20', inspection_result) + api.krl.signal_complete(1) +``` + +## Troubleshooting + +### "Timeout waiting for KRL signal" + +**Problem**: Python doesn't receive expected I/O signal from KRL + +**Solutions**: +1. Verify KRL program is running on robot +2. Check I/O configuration in RSI XML +3. Verify network connectivity (ping robot IP) +4. Check signal mapping ($OUT[1] → Digout.o1) +5. Increase timeout: `api.krl.wait_for_signal(1, timeout=60.0)` + +### "RSIVariableError: Tech.T11 not found" + +**Problem**: Tech variable not in receive_variables + +**Solutions**: +1. Add Tech variables to RSI XML `` section +2. Restart robot controller after XML changes +3. Verify variable configuration: `api.tools.show_variables()` + +### "Connection refused" + +**Problem**: Cannot connect to robot controller + +**Solutions**: +1. Check robot IP address in RSI XML +2. Verify robot is in correct mode (T1/T2/AUTO) +3. Ensure firewall allows UDP port 49152 +4. Check RSI is enabled on robot controller + +### KRL Program Halts + +**Problem**: KRL program stops unexpectedly + +**Solutions**: +1. Check KRL timeout values (increase if needed) +2. Verify Python script is running before KRL execution +3. Check error signals ($IN[2] for error condition) +4. Review KRL logs on teach pendant + +## Advanced Usage + +### Non-Blocking Monitoring + +For continuous operation without blocking: + +```python +import threading + +def monitor_state(): + while running: + state = api.krl.read_param('T11') + if state == CRITICAL_STATE: + handle_critical_state() + time.sleep(0.1) + +# Run monitoring in background thread +monitor_thread = threading.Thread(target=monitor_state, daemon=True) +monitor_thread.start() + +# Main thread does other work +perform_other_tasks() +``` + +### Multiple Coordination Channels + +Use different I/O channels for parallel coordination: + +```python +# Channel 1: Main workflow +if api.krl.wait_for_signal(1): + api.krl.signal_complete(1) + +# Channel 2: Emergency stop +if api.io.get_input(2): # Emergency input + logging.error("Emergency stop!") + api.safety.stop() + +# Channel 3: Auxiliary signaling +api.io.pulse(3, duration=0.1) # Quick pulse signal +``` + +### Integration with Motion Control + +Combine coordination with real-time RSI corrections: + +```python +# Wait for motion start +api.krl.wait_for_signal(1) + +# Send real-time corrections during KRL motion +for i in range(100): + sensor_offset = get_sensor_offset() + api.motion.update_cartesian(X=sensor_offset) + time.sleep(0.004) # 250Hz + +# Signal motion complete +api.krl.signal_complete(1) +``` + +## Next Steps + +1. **Test examples** with your robot controller +2. **Adapt templates** to your specific application +3. **Implement error recovery** mechanisms +4. **Create custom workflows** for your use case +5. **Document coordination** protocols for your team + +## References + +- [RSIPI API Documentation](../../README.md) +- [KRL Templates](../../templates/krl/README.md) +- [Phase 3 Summary](../../PHASE_3_SUMMARY.md) (when available) +- [KUKA RSI 3.3 Manual](https://www.kuka.com) + +--- + +**Last Updated**: January 17, 2026 +**RSIPI Version**: 2.0.0 diff --git a/src/RSIPI/io_api.py b/src/RSIPI/io_api.py index e79d1ec..b28efc0 100644 --- a/src/RSIPI/io_api.py +++ b/src/RSIPI/io_api.py @@ -1,7 +1,8 @@ """Digital I/O API namespace for RSIPI.""" import logging -from typing import Union, TYPE_CHECKING +import time +from typing import Union, Optional, TYPE_CHECKING if TYPE_CHECKING: from .rsi_client import RSIClient @@ -61,15 +62,143 @@ class IOAPI: logging.debug(f"I/O {var_name} set to {state_value}") return result - # TODO (Phase 3): Implement high-level I/O helpers - # def set_output(self, channel: int, value: bool) -> str: - # """Set digital output by channel number.""" - # pass - # - # def get_input(self, channel: int) -> bool: - # """Read digital input by channel number.""" - # pass - # - # def pulse(self, channel: int, duration: float = 0.1) -> None: - # """Generate a pulse on the specified output channel.""" - # pass + def set_output(self, channel: int, value: bool, group: str = 'Digout') -> str: + """ + Set digital output by channel number. + + High-level wrapper for setting digital outputs. More convenient than + toggle() when working with standard digital output channels. + + Args: + channel: Output channel number (1-based, e.g., 1 for o1) + value: Desired state (True = ON, False = OFF) + group: I/O group name (default: 'Digout') + + Returns: + Status message indicating success + + Raises: + RSIVariableError: If the output channel doesn't exist + RSISafetyViolation: If safety checks prevent the operation + + Example: + >>> api.io.set_output(1, True) # Turn ON output 1 + 'Updated Digout.o1 to 1' + >>> api.io.set_output(3, False) # Turn OFF output 3 + 'Updated Digout.o3 to 0' + >>> api.io.set_output(5, True, group='DiO') # Custom group + 'Updated DiO.5 to 1' + + Note: + The default group 'Digout' corresponds to standard KUKA digital + outputs configured in RSI. Channel numbering starts at 1 to match + KUKA controller conventions. + """ + channel_name = f"o{channel}" + return self.toggle(group, channel_name, value) + + def get_input(self, channel: int, group: str = 'Digin') -> bool: + """ + Read digital input by channel number. + + High-level wrapper for reading digital input states from the robot + controller. Returns current state as boolean. + + Args: + channel: Input channel number (1-based, e.g., 1 for i1) + group: I/O group name (default: 'Digin') + + Returns: + True if input is HIGH/ON, False if LOW/OFF + + Raises: + RSIVariableError: If the input channel doesn't exist in receive_variables + + Example: + >>> # Check if input 1 is active + >>> if api.io.get_input(1): + ... print("Sensor triggered!") + Sensor triggered! + + >>> # Read from custom group + >>> state = api.io.get_input(5, group='DiI') + >>> print(f"Input 5 state: {state}") + Input 5 state: True + + Note: + This reads from receive_variables, which contains the robot + controller's current I/O state. Values are updated every RSI + cycle (~4ms). + """ + from .exceptions import RSIVariableError + + channel_name = f"i{channel}" + var_name = f"{group}.{channel_name}" + + # Navigate nested receive_variables structure + if group in self.client.receive_variables: + group_dict = self.client.receive_variables.get(group, {}) + if isinstance(group_dict, dict) and channel_name in group_dict: + value = group_dict[channel_name] + return bool(value) + else: + raise RSIVariableError(f"Input channel '{channel_name}' not found in group '{group}'") + else: + raise RSIVariableError(f"Input group '{group}' not found in receive_variables") + + def pulse(self, channel: int, duration: float = 0.1, group: str = 'Digout') -> str: + """ + Generate a timed pulse on the specified output channel. + + Turns the output ON, waits for the specified duration, then turns it OFF. + Useful for triggering pneumatic actuators, solenoids, or signaling events. + + Args: + channel: Output channel number (1-based) + duration: Pulse duration in seconds (default: 0.1 = 100ms) + group: I/O group name (default: 'Digout') + + Returns: + Status message indicating completion + + Raises: + RSIVariableError: If the output channel doesn't exist + RSISafetyViolation: If safety checks prevent the operation + + Example: + >>> # 100ms pulse on output 2 + >>> api.io.pulse(2) + 'Pulse completed on Digout.o2 (duration: 0.1s)' + + >>> # 500ms pulse on output 5 + >>> api.io.pulse(5, duration=0.5) + 'Pulse completed on Digout.o5 (duration: 0.5s)' + + >>> # Trigger pneumatic gripper on custom channel + >>> api.io.pulse(3, duration=0.2, group='DiO') + 'Pulse completed on DiO.o3 (duration: 0.2s)' + + Warning: + This method blocks for the duration of the pulse. For non-blocking + pulses, consider using threading or async I/O patterns. + + Note: + Pulse timing accuracy depends on system load and RSI cycle time. + For critical timing requirements, consider hardware-timed outputs + or KRL-based pulse generation. + """ + channel_name = f"o{channel}" + var_name = f"{group}.{channel_name}" + + # Turn ON + self.set_output(channel, True, group=group) + logging.debug(f"Pulse started on {var_name}") + + # Wait for duration + time.sleep(duration) + + # Turn OFF + self.set_output(channel, False, group=group) + logging.info(f"Pulse completed on {var_name} (duration: {duration}s)") + + return f"Pulse completed on {var_name} (duration: {duration}s)" diff --git a/src/RSIPI/krl_api.py b/src/RSIPI/krl_api.py index 36b3da2..3af83d3 100644 --- a/src/RSIPI/krl_api.py +++ b/src/RSIPI/krl_api.py @@ -1,7 +1,8 @@ """KRL program manipulation API namespace for RSIPI.""" import logging -from typing import Optional, TYPE_CHECKING +import time +from typing import Optional, Union, TYPE_CHECKING if TYPE_CHECKING: from .rsi_client import RSIClient @@ -115,19 +116,267 @@ class KRLAPI: logging.error(f"RSI injection failed: {e}") return f"RSI injection failed: {e}" - # TODO (Phase 3): Implement KRL coordination helpers - # def wait_for_signal(self, channel: int, timeout: float = 5.0) -> bool: - # """Wait for KRL to set a specific I/O signal.""" - # pass - # - # def signal_complete(self, channel: int) -> None: - # """Signal to KRL that Python-side operation is complete.""" - # pass - # - # def write_param(self, slot: str, value: float) -> str: - # """Write parameter to Tech.C variable for KRL to read.""" - # pass - # - # def read_param(self, slot: str) -> float: - # """Read parameter from Tech.T variable written by KRL.""" - # pass + def wait_for_signal( + self, + channel: int, + timeout: float = 5.0, + check_interval: float = 0.01, + group: str = 'Digin' + ) -> bool: + """ + Wait for KRL to set a specific I/O signal. + + Blocks until the specified digital input becomes HIGH, or timeout occurs. + Commonly used for synchronization where Python waits for KRL to signal + completion of a robot operation before proceeding. + + Args: + channel: Input channel number to monitor (1-based) + timeout: Maximum wait time in seconds (default: 5.0) + check_interval: Polling interval in seconds (default: 0.01 = 10ms) + group: I/O group name (default: 'Digin') + + Returns: + True if signal received, False if timeout occurred + + Example: + >>> # Wait for KRL to signal ready on input 3 + >>> if api.krl.wait_for_signal(3, timeout=10.0): + ... print("KRL signaled ready!") + ... # Proceed with Python-side processing + ... else: + ... print("Timeout waiting for KRL signal") + + >>> # Handshake pattern: Python waits → KRL signals → Python continues + >>> api.motion.update_cartesian(X=100) # Send correction + >>> api.krl.wait_for_signal(1) # Wait for KRL to acknowledge + >>> # KRL has processed the correction, safe to continue + + Note: + This is a blocking operation. The check_interval determines polling + frequency - lower values provide faster response but higher CPU usage. + For typical RSI applications, 10-50ms intervals are appropriate. + + Warning: + Ensure the KRL program actually sets the signal, otherwise this will + block until timeout. Consider using try/except for timeout handling. + """ + from .io_api import IOAPI + + io_api = IOAPI(self.client) + start_time = time.time() + + logging.debug(f"Waiting for signal on {group}.i{channel} (timeout: {timeout}s)") + + while (time.time() - start_time) < timeout: + if io_api.get_input(channel, group=group): + elapsed = time.time() - start_time + logging.info(f"Signal received on {group}.i{channel} after {elapsed:.3f}s") + return True + time.sleep(check_interval) + + logging.warning(f"Timeout waiting for signal on {group}.i{channel} after {timeout}s") + return False + + def signal_complete(self, channel: int, group: str = 'Digout') -> str: + """ + Signal to KRL that Python-side operation is complete. + + Sets the specified digital output HIGH to notify the KRL program that + Python has finished processing and KRL can proceed. + + Args: + channel: Output channel number to signal on (1-based) + group: I/O group name (default: 'Digout') + + Returns: + Status message indicating success + + Raises: + RSIVariableError: If the output channel doesn't exist + RSISafetyViolation: If safety checks prevent the operation + + Example: + >>> # Signal KRL that data processing is complete + >>> api.krl.signal_complete(2) + 'Signaled complete on Digout.o2' + + >>> # Typical coordination pattern: + >>> # 1. KRL sends data via Tech variables + >>> # 2. Python processes data + >>> result = api.krl.read_param('T11') # Read from KRL + >>> processed = result * 2.0 # Process + >>> api.krl.write_param('C11', processed) # Write result + >>> api.krl.signal_complete(1) # Tell KRL we're done + + Note: + This sets the output and leaves it HIGH. If you need to reset the + signal after KRL acknowledges, use api.io.pulse() instead or manually + call api.io.set_output(channel, False) after KRL reads the signal. + + See Also: + wait_for_signal() - Complementary method for waiting on inputs + api.io.pulse() - For temporary signal pulses + """ + from .io_api import IOAPI + + io_api = IOAPI(self.client) + io_api.set_output(channel, True, group=group) + logging.info(f"Signaled complete on {group}.o{channel}") + return f"Signaled complete on {group}.o{channel}" + + def write_param(self, slot: Union[int, str], value: float) -> str: + """ + Write parameter to Tech.C variable for KRL to read. + + Tech.C variables (C11-C199) are "Control" parameters written by Python + and read by KRL programs. Used for passing numerical data from Python + to the robot controller. + + Args: + slot: Tech.C slot number (11-199) or string like 'C11', 'c15' + value: Numerical value to write + + Returns: + Status message indicating success + + Raises: + ValueError: If slot number is invalid (must be 11-199) + RSIVariableError: If Tech variable group doesn't exist + RSISafetyViolation: If safety checks prevent the operation + + Example: + >>> # Send target position to KRL + >>> api.krl.write_param(11, 650.5) # Tech.C11 = 650.5 + 'Updated Tech.C11 to 650.5' + + >>> # Send multiple parameters + >>> api.krl.write_param('C12', 120.0) # X coordinate + >>> api.krl.write_param('C13', -50.0) # Y coordinate + >>> api.krl.write_param('C14', 800.0) # Z coordinate + + >>> # KRL side reads with: target_x = $TECH.C[12] + + Note: + KUKA RSI Tech variables support slots 11-199. Slots 1-10 are reserved. + The KRL program must read from $TECH.C[n] to access these values. + + KRL Example: + ```krl + DEF my_program() + DECL REAL target_x, target_y, target_z + ; Python writes to C12, C13, C14 + target_x = $TECH.C[12] + target_y = $TECH.C[13] + target_z = $TECH.C[14] + ; Use coordinates... + END + ``` + + See Also: + read_param() - Read Tech.T variables written by KRL + """ + # Normalize slot to integer + if isinstance(slot, str): + slot_str = slot.upper().strip() + if slot_str.startswith('C'): + slot_num = int(slot_str[1:]) + else: + slot_num = int(slot_str) + else: + slot_num = int(slot) + + # Validate slot range (KUKA reserves 1-10, usable range is 11-199) + if not (11 <= slot_num <= 199): + raise ValueError(f"Tech slot must be between 11-199, got {slot_num}") + + from .tools_api import ToolsAPI + + tools = ToolsAPI(self.client) + var_name = f"Tech.C{slot_num}" + result = tools.update_variable(var_name, value) + logging.debug(f"Wrote {value} to {var_name}") + return result + + def read_param(self, slot: Union[int, str]) -> float: + """ + Read parameter from Tech.T variable written by KRL. + + Tech.T variables (T11-T199) are "Transfer" parameters written by KRL + programs and read by Python. Used for passing numerical data from the + robot controller to Python. + + Args: + slot: Tech.T slot number (11-199) or string like 'T11', 't15' + + Returns: + Numerical value from the Tech.T slot + + Raises: + ValueError: If slot number is invalid (must be 11-199) + RSIVariableError: If Tech variable group doesn't exist or slot not found + + Example: + >>> # Read sensor value from KRL + >>> force = api.krl.read_param(11) # Read Tech.T11 + >>> print(f"Force reading: {force}") + Force reading: 125.5 + + >>> # Read multiple parameters + >>> actual_x = api.krl.read_param('T12') + >>> actual_y = api.krl.read_param('T13') + >>> actual_z = api.krl.read_param('T14') + + >>> # KRL side writes with: $TECH.T[12] = actual_pos.X + + Note: + Tech.T variables are updated every RSI cycle (~4ms) from the robot + controller. Values reflect the KRL program's last write operation. + + KRL Example: + ```krl + DEF my_program() + DECL E6POS actual_pos + actual_pos = $POS_ACT + ; Write to Tech.T for Python to read + $TECH.T[12] = actual_pos.X + $TECH.T[13] = actual_pos.Y + $TECH.T[14] = actual_pos.Z + END + ``` + + Warning: + Ensure the KRL program has written to the Tech.T slot before reading, + otherwise you'll receive the default value (typically 0.0). + + See Also: + write_param() - Write Tech.C variables for KRL to read + """ + from .exceptions import RSIVariableError + + # Normalize slot to integer + if isinstance(slot, str): + slot_str = slot.upper().strip() + if slot_str.startswith('T'): + slot_num = int(slot_str[1:]) + else: + slot_num = int(slot_str) + else: + slot_num = int(slot) + + # Validate slot range + if not (11 <= slot_num <= 199): + raise ValueError(f"Tech slot must be between 11-199, got {slot_num}") + + # Read from receive_variables + if 'Tech' in self.client.receive_variables: + tech_dict = self.client.receive_variables.get('Tech', {}) + var_name = f"T{slot_num}" + if isinstance(tech_dict, dict) and var_name in tech_dict: + value = tech_dict[var_name] + logging.debug(f"Read {value} from Tech.{var_name}") + return float(value) + else: + raise RSIVariableError(f"Tech.{var_name} not found in receive_variables") + else: + raise RSIVariableError("Tech variable group not found in receive_variables") diff --git a/templates/krl/README.md b/templates/krl/README.md new file mode 100644 index 0000000..0262bac --- /dev/null +++ b/templates/krl/README.md @@ -0,0 +1,393 @@ +# KRL Coordination Templates + +This directory contains KRL program templates demonstrating common Python-KRL coordination patterns using RSIPI. + +## Available Templates + +### 1. basic_handshake.src +**Simple I/O handshaking between Python and KRL** + +- KRL signals "ready" via digital output +- Python waits for signal, processes data +- Python signals "complete" via input +- KRL waits for completion, then continues + +**Use Case**: Basic synchronization, ensuring Python completes processing before KRL continues. + +**Coordination Methods Used**: +- `api.krl.wait_for_signal(channel, timeout)` +- `api.krl.signal_complete(channel)` + +### 2. parameter_passing.src +**Bidirectional numerical data exchange via Tech variables** + +- KRL writes current position to Tech.T variables +- Python reads position data +- Python calculates target and writes to Tech.C variables +- KRL reads target and executes motion + +**Use Case**: Passing numerical parameters (positions, forces, tolerances) between Python and KRL. + +**Coordination Methods Used**: +- `api.krl.read_param(slot)` - Read from Tech.T +- `api.krl.write_param(slot, value)` - Write to Tech.C +- `api.krl.wait_for_signal(channel, timeout)` +- `api.krl.signal_complete(channel)` + +### 3. state_machine.src +**Multi-state workflow with complex coordination** + +Implements a 5-state machine: +1. **IDLE**: Waiting to start +2. **CALIBRATING**: Python performing calibration +3. **READY**: Calibration complete +4. **EXECUTING**: Robot motion in progress +5. **COMPLETE**: Task finished +6. **ERROR**: Error handling state + +**Use Case**: Complex workflows requiring multiple handshakes, error handling, and state tracking. + +**Coordination Methods Used**: +- All coordination methods from basic_handshake and parameter_passing +- State variable in Tech.T[11] +- Command variable in Tech.C[11] + +## Python-KRL Coordination Patterns + +### Pattern 1: Simple Handshake + +```python +# Python side +api.krl.wait_for_signal(1) # Wait for KRL ready signal +# Do processing... +api.krl.signal_complete(1) # Signal KRL to continue +``` + +```krl +; KRL side +$OUT[1] = TRUE ; Signal ready to Python +; Wait for Python completion +WHILE $IN[1] == FALSE + WAIT SEC 0.1 +ENDWHILE +``` + +### Pattern 2: Parameter Exchange + +```python +# Python side +api.krl.wait_for_signal(1) + +# Read from KRL +value = api.krl.read_param('T11') + +# Process and write back +result = process(value) +api.krl.write_param('C11', result) + +api.krl.signal_complete(1) +``` + +```krl +; KRL side +$TECH.T[11] = some_value +$OUT[1] = TRUE ; Signal data ready + +; Wait for Python +WHILE $IN[1] == FALSE + WAIT SEC 0.1 +ENDWHILE + +; Read result +result = $TECH.C[11] +``` + +### Pattern 3: Continuous Monitoring + +```python +# Python side - non-blocking monitoring loop +api.start() + +while api.is_running(): + state = api.krl.read_param('T11') + + if state == 1: # Specific state + # React to state change + api.krl.write_param('C11', calculated_value) + api.krl.signal_complete(1) + + time.sleep(0.1) # Check every 100ms + +api.stop() +``` + +```krl +; KRL side - updates state continuously +$TECH.T[11] = current_state + +; Wait for Python response when needed +WHILE $IN[1] == FALSE + WAIT SEC 0.1 +ENDWHILE + +calculated = $TECH.C[11] +``` + +## Tech Variable Conventions + +### Tech.C Variables (Python → KRL) +**"Control" variables - Python writes, KRL reads** + +| Slot | Description | Example Usage | +|------|-------------|---------------| +| C11 | Command/state | 0=continue, 1=pause, 2=abort | +| C12-C14 | Position offsets | X, Y, Z corrections | +| C15-C17 | Target position | Calculated target coordinates | +| C18-C20 | Process parameters | Speed, force, tolerance | +| C21+ | Custom parameters | Application-specific data | + +```python +# Python writes +api.krl.write_param('C11', 0) # Command: continue +api.krl.write_param('C12', 5.0) # X offset +api.krl.write_param('C13', -2.0) # Y offset +``` + +```krl +; KRL reads +command = $TECH.C[11] +offset_x = $TECH.C[12] +offset_y = $TECH.C[13] +``` + +### Tech.T Variables (KRL → Python) +**"Transfer" variables - KRL writes, Python reads** + +| Slot | Description | Example Usage | +|------|-------------|---------------| +| T11 | Current state | State machine state number | +| T12-T14 | Current position | X, Y, Z coordinates | +| T15-T17 | Force/torque | Measured forces | +| T18-T20 | Sensor readings | External sensor data | +| T21+ | Custom data | Application-specific values | + +```krl +; KRL writes +$TECH.T[11] = current_state +$TECH.T[12] = $POS_ACT.X +$TECH.T[13] = $POS_ACT.Y +$TECH.T[14] = $POS_ACT.Z +``` + +```python +# Python reads +state = api.krl.read_param('T11') +pos_x = api.krl.read_param('T12') +pos_y = api.krl.read_param('T13') +pos_z = api.krl.read_param('T14') +``` + +## I/O Signal Conventions + +### Standard I/O Mapping + +| Signal | Type | Purpose | +|--------|------|---------| +| $OUT[1] | Output | KRL → Python state/ready signal | +| $IN[1] | Input | Python → KRL completion acknowledgement | +| $IN[2] | Input | Python → KRL error signal | +| $OUT[2] | Output | KRL → Python auxiliary signal | + +```python +# Python I/O methods +api.krl.wait_for_signal(1) # Wait for $OUT[1] +api.krl.signal_complete(1) # Set $IN[1] +api.io.set_output(2, True) # Control $OUT[2] +``` + +## Error Handling Best Practices + +### Timeouts + +**Always use timeouts to prevent indefinite blocking:** + +```python +# Python +if not api.krl.wait_for_signal(1, timeout=10.0): + print("Timeout waiting for KRL!") + # Handle error +``` + +```krl +; KRL +INT counter +counter = 0 +WHILE ($IN[1] == FALSE) AND (counter < 100) + WAIT SEC 0.1 + counter = counter + 1 +ENDWHILE + +IF counter >= 100 THEN + ; Timeout - handle error + HALT +ENDIF +``` + +### Error Signaling + +**Use dedicated error channels:** + +```python +# Python detects error +if error_condition: + api.io.set_output(2, True) # Signal error to KRL +``` + +```krl +; KRL checks for errors +IF $IN[2] == TRUE THEN + ; Python signaled error + HALT +ENDIF +``` + +## Integration with RSI Motion Control + +All coordination patterns work seamlessly with RSI real-time motion corrections: + +```python +# Python coordinates with KRL AND sends real-time corrections +api.start() + +# Wait for KRL to start motion phase +api.krl.wait_for_signal(1) + +# Send real-time corrections during KRL motion +for i in range(100): + correction = calculate_correction() + api.motion.update_cartesian(X=correction) + time.sleep(0.004) # 250Hz update rate + +# Signal motion phase complete +api.krl.signal_complete(1) + +api.stop() +``` + +```krl +; KRL executes motion while Python sends corrections +$OUT[1] = TRUE ; Signal motion start + +; Python sends corrections via RSI during this move +LIN target_pos Vel=0.5 m/s CPDAT1 Tool[1] Base[0] + +; Wait for Python to finish corrections +WHILE $IN[1] == FALSE + WAIT SEC 0.1 +ENDWHILE +``` + +## Testing Templates + +To test these templates: + +1. **Upload KRL program to robot controller** +2. **Start Python coordination script** +3. **Execute KRL program on teach pendant** +4. **Monitor coordination in Python logs** + +Example Python test script: + +```python +from RSIPI import RSIAPI +import time + +api = RSIAPI('RSI_EthernetConfig.xml') +api.start() + +try: + print("Waiting for KRL ready signal...") + + if api.krl.wait_for_signal(1, timeout=30.0): + print("✅ KRL signaled ready!") + + # Simulate processing + time.sleep(2.0) + print("Processing complete") + + # Signal back to KRL + api.krl.signal_complete(1) + print("✅ Signaled KRL to continue") + + else: + print("❌ Timeout waiting for KRL") + +except KeyboardInterrupt: + print("\n⚠️ Interrupted by user") + +finally: + api.stop() + print("API stopped") +``` + +## Troubleshooting + +### Signal Not Received + +**Check I/O configuration in RSI XML:** +```xml + + + + + + + + + + +``` + +### Tech Variable Not Found + +**Ensure Tech variables are configured in RSI XML:** +```xml + + + + + + + + + + +``` + +### Timing Issues + +- **Reduce check_interval for faster response**: `api.krl.wait_for_signal(1, check_interval=0.005)` +- **Increase timeout for slow operations**: `api.krl.wait_for_signal(1, timeout=60.0)` +- **Add WAIT SEC delays in KRL for signal propagation** + +## Next Steps + +After understanding these templates: + +1. **Adapt templates** to your specific application +2. **Test coordination** patterns with your robot +3. **Implement error recovery** mechanisms +4. **Document custom** coordination protocols +5. **Create application-specific** state machines + +## References + +- [RSIPI Documentation](../../README.md) +- [Phase 3 Summary](../../PHASE_3_SUMMARY.md) (when available) +- [KUKA RSI 3.3 Documentation](https://www.kuka.com) + +--- + +**Template Author**: RSIPI Development Team +**Last Updated**: January 17, 2026 +**Version**: 1.0 diff --git a/templates/krl/basic_handshake.src b/templates/krl/basic_handshake.src new file mode 100644 index 0000000..a2db7ba --- /dev/null +++ b/templates/krl/basic_handshake.src @@ -0,0 +1,80 @@ +&ACCESS RVP +&REL 1 +&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe +&PARAM EDITMASK = * +DEF basic_handshake() +; ================================================================ +; Basic I/O Handshake Pattern +; ================================================================ +; Demonstrates simple bidirectional signaling between KRL and Python +; via digital I/O channels. +; +; Python-KRL Coordination Flow: +; 1. KRL signals "ready" to Python (output 1) +; 2. Python waits for signal, then processes +; 3. Python signals "complete" to KRL (input 1) +; 4. KRL waits for completion signal, then continues +; +; Python Code Example: +; api = RSIAPI('RSI_EthernetConfig.xml') +; api.start() +; +; # Wait for KRL to signal ready +; if api.krl.wait_for_signal(1, timeout=10.0): +; print("KRL is ready!") +; +; # Do Python processing here +; time.sleep(1.0) +; +; # Signal completion back to KRL +; api.krl.signal_complete(1) +; +; api.stop() +; ================================================================ + + ; Variable declarations + INT i + BOOL python_ready + + BAS(#INITMOV, 0) ; Initialize motion + + ; ============================================================ + ; STEP 1: KRL signals ready to Python + ; ============================================================ + $OUT[1] = TRUE ; Signal ready on digital output 1 + WAIT SEC 0.1 ; Brief delay for signal propagation + + ; ============================================================ + ; STEP 2: Wait for Python to acknowledge completion + ; ============================================================ + python_ready = FALSE + i = 0 + + ; Poll input 1 until Python signals completion (max 10 seconds) + WHILE (python_ready == FALSE) AND (i < 100) + IF $IN[1] == TRUE THEN + python_ready = TRUE + ELSE + WAIT SEC 0.1 + i = i + 1 + ENDIF + ENDWHILE + + IF python_ready == TRUE THEN + ; Python signaled completion successfully + ; Safe to proceed with robot motion + + ; Reset handshake signals + $OUT[1] = FALSE + + ; Example motion with Python coordination + PTP HOME Vel=100 % DEFAULT + ; Python can send corrections during motion via RSI + + ELSE + ; Timeout - Python did not respond + ; Halt program for safety + HALT + ENDIF + +END diff --git a/templates/krl/parameter_passing.src b/templates/krl/parameter_passing.src new file mode 100644 index 0000000..8904824 --- /dev/null +++ b/templates/krl/parameter_passing.src @@ -0,0 +1,125 @@ +&ACCESS RVP +&REL 1 +&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe +&PARAM EDITMASK = * +DEF parameter_passing() +; ================================================================ +; Parameter Passing via Tech Variables +; ================================================================ +; Demonstrates bidirectional numerical data exchange between KRL +; and Python using RSI Tech variables. +; +; Tech Variable Conventions: +; - Tech.C[11-199]: Python writes, KRL reads (Control variables) +; - Tech.T[11-199]: KRL writes, Python reads (Transfer variables) +; +; Python-KRL Coordination Flow: +; 1. KRL writes current position to Tech.T +; 2. Python reads position from Tech.T +; 3. Python processes data and writes target to Tech.C +; 4. KRL reads target from Tech.C and executes motion +; +; Python Code Example: +; api = RSIAPI('RSI_EthernetConfig.xml') +; api.start() +; +; # Wait for KRL to signal data ready +; api.krl.wait_for_signal(1) +; +; # Read current position from KRL +; current_x = api.krl.read_param('T11') +; current_y = api.krl.read_param('T12') +; current_z = api.krl.read_param('T13') +; +; # Calculate target (e.g., move 100mm in X) +; target_x = current_x + 100.0 +; target_y = current_y +; target_z = current_z +; +; # Write target back to KRL +; api.krl.write_param('C11', target_x) +; api.krl.write_param('C12', target_y) +; api.krl.write_param('C13', target_z) +; +; # Signal completion +; api.krl.signal_complete(1) +; +; api.stop() +; ================================================================ + + ; Variable declarations + E6POS current_pos, target_pos + REAL target_x, target_y, target_z + INT i + BOOL data_ready + + BAS(#INITMOV, 0) + + ; ============================================================ + ; STEP 1: KRL writes current position to Tech.T + ; ============================================================ + current_pos = $POS_ACT ; Get current TCP position + + ; Write position components to Transfer variables (Python reads) + $TECH.T[11] = current_pos.X + $TECH.T[12] = current_pos.Y + $TECH.T[13] = current_pos.Z + $TECH.T[14] = current_pos.A + $TECH.T[15] = current_pos.B + $TECH.T[16] = current_pos.C + + ; ============================================================ + ; STEP 2: Signal Python that data is ready + ; ============================================================ + $OUT[1] = TRUE + WAIT SEC 0.1 + + ; ============================================================ + ; STEP 3: Wait for Python to process and write target + ; ============================================================ + data_ready = FALSE + i = 0 + + ; Poll input 1 for Python completion signal + WHILE (data_ready == FALSE) AND (i < 100) + IF $IN[1] == TRUE THEN + data_ready = TRUE + ELSE + WAIT SEC 0.1 + i = i + 1 + ENDIF + ENDWHILE + + IF data_ready == TRUE THEN + ; ============================================================ + ; STEP 4: Read target position from Tech.C + ; ============================================================ + ; Python has written to Control variables (KRL reads) + target_x = $TECH.C[11] + target_y = $TECH.C[12] + target_z = $TECH.C[13] + + ; Construct target pose + target_pos = current_pos ; Copy orientation + target_pos.X = target_x + target_pos.Y = target_y + target_pos.Z = target_z + + ; ============================================================ + ; STEP 5: Execute motion to target + ; ============================================================ + ; Reset signals + $OUT[1] = FALSE + + ; Move to calculated target + LIN target_pos Vel=0.5 m/s CPDAT1 Tool[1] Base[0] + + ; Success indication + WAIT SEC 0.5 + + ELSE + ; Timeout - halt for safety + HALT + ENDIF + +END diff --git a/templates/krl/state_machine.src b/templates/krl/state_machine.src new file mode 100644 index 0000000..6245e67 --- /dev/null +++ b/templates/krl/state_machine.src @@ -0,0 +1,189 @@ +&ACCESS RVP +&REL 1 +&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe +&PARAM EDITMASK = * +DEF state_machine() +; ================================================================ +; Multi-State Coordination Pattern +; ================================================================ +; Demonstrates a state machine for complex Python-KRL workflows. +; Uses both I/O signals and Tech variables for robust coordination. +; +; State Definitions: +; State 0: IDLE - Waiting to start +; State 1: CALIBRATING - Python performing calibration +; State 2: READY - Calibration complete, ready for motion +; State 3: EXECUTING - Robot executing motion task +; State 4: COMPLETE - Task finished +; State 9: ERROR - Error condition +; +; I/O Mapping: +; $OUT[1]: State signal to Python (pulse indicates state change) +; $IN[1]: Python acknowledgement +; $IN[2]: Python error signal +; +; Tech Variable Mapping: +; Tech.T[11]: Current state (KRL → Python) +; Tech.C[11]: Python command (0=continue, 1=pause, 2=abort) +; Tech.C[12-14]: Calibration offset (X, Y, Z) +; +; Python Code Example: +; api = RSIAPI('RSI_EthernetConfig.xml') +; api.start() +; +; # Monitor state transitions +; while True: +; state = api.krl.read_param('T11') +; +; if state == 1: # CALIBRATING +; print("Performing calibration...") +; # Calibration logic here +; offset_x = 5.0 +; offset_y = -2.0 +; offset_z = 0.5 +; +; # Write calibration results +; api.krl.write_param('C12', offset_x) +; api.krl.write_param('C13', offset_y) +; api.krl.write_param('C14', offset_z) +; +; # Acknowledge state completion +; api.krl.signal_complete(1) +; +; elif state == 3: # EXECUTING +; print("Robot executing motion...") +; # Monitor execution, send corrections if needed +; api.motion.update_cartesian(X=offset_x, Y=offset_y) +; +; elif state == 4: # COMPLETE +; print("Task complete!") +; break +; +; elif state == 9: # ERROR +; print("Error detected, aborting!") +; break +; +; time.sleep(0.1) +; +; api.stop() +; ================================================================ + + ; Variable declarations + INT current_state, python_cmd + REAL offset_x, offset_y, offset_z + E6POS target_pos + BOOL ack_received + INT timeout_counter + + BAS(#INITMOV, 0) + + ; Initialize state machine + current_state = 0 ; IDLE + $TECH.T[11] = current_state + + ; ============================================================ + ; STATE 0: IDLE + ; ============================================================ + current_state = 0 + $TECH.T[11] = current_state + WAIT SEC 1.0 + + ; ============================================================ + ; STATE 1: CALIBRATING + ; ============================================================ + current_state = 1 + $TECH.T[11] = current_state + + ; Signal Python to start calibration + $OUT[1] = TRUE + WAIT SEC 0.1 + $OUT[1] = FALSE + + ; Wait for Python calibration completion + ack_received = FALSE + timeout_counter = 0 + + WHILE (ack_received == FALSE) AND (timeout_counter < 200) + ; Check for error signal + IF $IN[2] == TRUE THEN + current_state = 9 ; ERROR + $TECH.T[11] = current_state + HALT + ENDIF + + ; Check for completion + IF $IN[1] == TRUE THEN + ack_received = TRUE + ELSE + WAIT SEC 0.1 + timeout_counter = timeout_counter + 1 + ENDIF + ENDWHILE + + IF ack_received == FALSE THEN + ; Timeout error + current_state = 9 + $TECH.T[11] = current_state + HALT + ENDIF + + ; ============================================================ + ; STATE 2: READY + ; ============================================================ + ; Read calibration offsets from Python + offset_x = $TECH.C[12] + offset_y = $TECH.C[13] + offset_z = $TECH.C[14] + + current_state = 2 + $TECH.T[11] = current_state + + ; Signal ready + $OUT[1] = TRUE + WAIT SEC 0.1 + $OUT[1] = FALSE + WAIT SEC 0.5 + + ; ============================================================ + ; STATE 3: EXECUTING + ; ============================================================ + current_state = 3 + $TECH.T[11] = current_state + + ; Calculate target with calibration offset + target_pos = $POS_ACT + target_pos.X = target_pos.X + offset_x + target_pos.Y = target_pos.Y + offset_y + target_pos.Z = target_pos.Z + offset_z + + ; Execute motion (Python can send RSI corrections during this) + LIN target_pos Vel=0.3 m/s CPDAT1 Tool[1] Base[0] + + ; Check for pause/abort commands + python_cmd = $TECH.C[11] + IF python_cmd == 2 THEN + ; Abort requested + current_state = 9 + $TECH.T[11] = current_state + HALT + ENDIF + + ; ============================================================ + ; STATE 4: COMPLETE + ; ============================================================ + current_state = 4 + $TECH.T[11] = current_state + + ; Signal completion + $OUT[1] = TRUE + WAIT SEC 0.2 + $OUT[1] = FALSE + + ; Return to home + PTP HOME Vel=100 % DEFAULT + + ; Final state update + current_state = 0 ; Back to IDLE + $TECH.T[11] = current_state + +END