diff --git a/examples/coordination/01_basic_handshake.py b/examples/coordination/01_basic_handshake.py
new file mode 100644
index 0000000..efcf2e9
--- /dev/null
+++ b/examples/coordination/01_basic_handshake.py
@@ -0,0 +1,113 @@
+"""
+Basic I/O Handshake Example
+
+Demonstrates simple bidirectional signaling between Python and KRL using
+digital I/O channels. Works with templates/krl/basic_handshake.src
+
+Flow:
+1. KRL signals "ready" on output 1
+2. Python waits for signal
+3. Python performs processing
+4. Python signals "complete" on input 1
+5. KRL continues
+
+Usage:
+ python 01_basic_handshake.py --config RSI_EthernetConfig.xml
+"""
+
+import argparse
+import time
+import logging
+from RSIPI import RSIAPI
+
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s'
+)
+
+
+def basic_handshake_example(config_file: str) -> None:
+ """
+ Execute basic I/O handshake with KRL program.
+
+ Args:
+ config_file: Path to RSI configuration XML file
+ """
+ # Initialize API
+ api = RSIAPI(config_file)
+
+ try:
+ # Start RSI communication
+ logging.info("Starting RSI communication...")
+ api.start()
+ logging.info("✅ RSI started successfully")
+
+ # Wait for KRL to signal ready (digital output 1)
+ logging.info("Waiting for KRL ready signal...")
+
+ if api.krl.wait_for_signal(1, timeout=30.0):
+ logging.info("✅ KRL signaled ready!")
+
+ # Simulate Python processing (e.g., data analysis, sensor reading)
+ logging.info("Performing Python-side processing...")
+ time.sleep(2.0) # Simulated processing time
+
+ # Optional: Do actual work here
+ # process_sensor_data()
+ # calculate_corrections()
+ # update_database()
+
+ logging.info("✅ Processing complete")
+
+ # Signal completion back to KRL (digital input 1)
+ api.krl.signal_complete(1)
+ logging.info("✅ Signaled KRL to continue")
+
+ # KRL will now proceed with its motion program
+ logging.info("KRL is now free to continue motion")
+
+ else:
+ logging.error("❌ Timeout waiting for KRL ready signal")
+ logging.error("Check that KRL program is running and I/O is configured correctly")
+
+ except KeyboardInterrupt:
+ logging.warning("\n⚠️ Interrupted by user")
+
+ except Exception as e:
+ logging.error(f"❌ Error during handshake: {e}")
+
+ finally:
+ # Clean shutdown
+ logging.info("Stopping RSI communication...")
+ api.stop()
+ logging.info("✅ API stopped successfully")
+
+
+def main():
+ """Main entry point."""
+ parser = argparse.ArgumentParser(description='Basic I/O Handshake Example')
+ parser.add_argument(
+ '--config',
+ type=str,
+ default='RSI_EthernetConfig.xml',
+ help='Path to RSI configuration file'
+ )
+
+ args = parser.parse_args()
+
+ logging.info("=" * 60)
+ logging.info("RSIPI - Basic I/O Handshake Example")
+ logging.info("=" * 60)
+ logging.info(f"Config: {args.config}")
+ logging.info("=" * 60)
+
+ basic_handshake_example(args.config)
+
+ logging.info("=" * 60)
+ logging.info("Example complete!")
+ logging.info("=" * 60)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/coordination/02_parameter_passing.py b/examples/coordination/02_parameter_passing.py
new file mode 100644
index 0000000..1000488
--- /dev/null
+++ b/examples/coordination/02_parameter_passing.py
@@ -0,0 +1,133 @@
+"""
+Parameter Passing Example
+
+Demonstrates bidirectional numerical data exchange between Python and KRL
+using RSI Tech variables. Works with templates/krl/parameter_passing.src
+
+Flow:
+1. KRL writes current position to Tech.T11-T16
+2. Python waits for data ready signal
+3. Python reads position from Tech.T
+4. Python calculates target and writes to Tech.C11-C13
+5. Python signals completion
+6. KRL reads target from Tech.C and executes motion
+
+Usage:
+ python 02_parameter_passing.py --config RSI_EthernetConfig.xml
+"""
+
+import argparse
+import logging
+from RSIPI import RSIAPI
+
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s'
+)
+
+
+def parameter_passing_example(config_file: str) -> None:
+ """
+ Execute parameter passing coordination with KRL program.
+
+ Reads current position from KRL, calculates target position,
+ and sends target back to KRL for execution.
+
+ Args:
+ config_file: Path to RSI configuration XML file
+ """
+ api = RSIAPI(config_file)
+
+ try:
+ logging.info("Starting RSI communication...")
+ api.start()
+ logging.info("✅ RSI started successfully")
+
+ # Wait for KRL to signal that data is ready
+ logging.info("Waiting for KRL data ready signal...")
+
+ if api.krl.wait_for_signal(1, timeout=30.0):
+ logging.info("✅ KRL signaled data ready!")
+
+ # Read current position from Tech.T variables
+ logging.info("Reading current position from KRL...")
+ current_x = api.krl.read_param('T11')
+ current_y = api.krl.read_param('T12')
+ current_z = api.krl.read_param('T13')
+ current_a = api.krl.read_param('T14')
+ current_b = api.krl.read_param('T15')
+ current_c = api.krl.read_param('T16')
+
+ logging.info(f"Current position:")
+ logging.info(f" X: {current_x:.2f} mm")
+ logging.info(f" Y: {current_y:.2f} mm")
+ logging.info(f" Z: {current_z:.2f} mm")
+ logging.info(f" A: {current_a:.2f}°, B: {current_b:.2f}°, C: {current_c:.2f}°")
+
+ # Calculate target position (example: move 100mm in X, 50mm in Y)
+ logging.info("Calculating target position...")
+ target_x = current_x + 100.0 # Move 100mm in X
+ target_y = current_y + 50.0 # Move 50mm in Y
+ target_z = current_z + 0.0 # Keep Z constant
+
+ logging.info(f"Calculated target:")
+ logging.info(f" X: {target_x:.2f} mm (+100mm)")
+ logging.info(f" Y: {target_y:.2f} mm (+50mm)")
+ logging.info(f" Z: {target_z:.2f} mm (no change)")
+
+ # Write target position to Tech.C variables for KRL to read
+ logging.info("Writing target position to KRL...")
+ api.krl.write_param('C11', target_x)
+ api.krl.write_param('C12', target_y)
+ api.krl.write_param('C13', target_z)
+ logging.info("✅ Target position written to Tech.C")
+
+ # Signal KRL that calculation is complete
+ api.krl.signal_complete(1)
+ logging.info("✅ Signaled KRL that target is ready")
+
+ # KRL will now read target and execute motion
+ logging.info("KRL will now execute motion to calculated target")
+
+ else:
+ logging.error("❌ Timeout waiting for KRL data ready signal")
+
+ except KeyboardInterrupt:
+ logging.warning("\n⚠️ Interrupted by user")
+
+ except Exception as e:
+ logging.error(f"❌ Error during parameter passing: {e}")
+
+ finally:
+ logging.info("Stopping RSI communication...")
+ api.stop()
+ logging.info("✅ API stopped successfully")
+
+
+def main():
+ """Main entry point."""
+ parser = argparse.ArgumentParser(description='Parameter Passing Example')
+ parser.add_argument(
+ '--config',
+ type=str,
+ default='RSI_EthernetConfig.xml',
+ help='Path to RSI configuration file'
+ )
+
+ args = parser.parse_args()
+
+ logging.info("=" * 60)
+ logging.info("RSIPI - Parameter Passing Example")
+ logging.info("=" * 60)
+ logging.info(f"Config: {args.config}")
+ logging.info("=" * 60)
+
+ parameter_passing_example(args.config)
+
+ logging.info("=" * 60)
+ logging.info("Example complete!")
+ logging.info("=" * 60)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/coordination/03_state_machine.py b/examples/coordination/03_state_machine.py
new file mode 100644
index 0000000..360639c
--- /dev/null
+++ b/examples/coordination/03_state_machine.py
@@ -0,0 +1,208 @@
+"""
+State Machine Coordination Example
+
+Demonstrates a multi-state coordination workflow between Python and KRL.
+Works with templates/krl/state_machine.src
+
+States:
+ 0: IDLE - Waiting to start
+ 1: CALIBRATING - Python performing calibration
+ 2: READY - Calibration complete, ready for motion
+ 3: EXECUTING - Robot executing motion task
+ 4: COMPLETE - Task finished
+ 9: ERROR - Error condition
+
+Usage:
+ python 03_state_machine.py --config RSI_EthernetConfig.xml
+"""
+
+import argparse
+import time
+import logging
+from enum import IntEnum
+from RSIPI import RSIAPI
+
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s'
+)
+
+
+class State(IntEnum):
+ """State machine states matching KRL program."""
+ IDLE = 0
+ CALIBRATING = 1
+ READY = 2
+ EXECUTING = 3
+ COMPLETE = 4
+ ERROR = 9
+
+
+def perform_calibration() -> tuple[float, float, float]:
+ """
+ Perform calibration routine.
+
+ In a real application, this would:
+ - Read sensor data
+ - Analyze calibration target
+ - Calculate position offsets
+ - Validate results
+
+ Returns:
+ Tuple of (offset_x, offset_y, offset_z) in mm
+ """
+ logging.info("Performing calibration routine...")
+
+ # Simulate calibration processing
+ time.sleep(2.0)
+
+ # Example calibration results (in real app, calculated from sensors)
+ offset_x = 5.0
+ offset_y = -2.0
+ offset_z = 0.5
+
+ logging.info(f"Calibration complete:")
+ logging.info(f" Offset X: {offset_x:+.2f} mm")
+ logging.info(f" Offset Y: {offset_y:+.2f} mm")
+ logging.info(f" Offset Z: {offset_z:+.2f} mm")
+
+ return (offset_x, offset_y, offset_z)
+
+
+def state_machine_example(config_file: str) -> None:
+ """
+ Execute state machine coordination with KRL program.
+
+ Monitors KRL state transitions and responds appropriately
+ to each state change.
+
+ Args:
+ config_file: Path to RSI configuration XML file
+ """
+ api = RSIAPI(config_file)
+
+ try:
+ logging.info("Starting RSI communication...")
+ api.start()
+ logging.info("✅ RSI started successfully")
+
+ # Main state monitoring loop
+ logging.info("Monitoring state machine...")
+ last_state = None
+ calibration_offsets = (0.0, 0.0, 0.0)
+
+ while True:
+ # Read current state from KRL
+ current_state = int(api.krl.read_param('T11'))
+
+ # Only log state changes
+ if current_state != last_state:
+ logging.info(f"State changed: {State(last_state or 0).name} → {State(current_state).name}")
+ last_state = current_state
+
+ # Handle each state
+ if current_state == State.IDLE:
+ # Waiting for KRL to start
+ logging.debug("Waiting in IDLE state...")
+ time.sleep(0.5)
+
+ elif current_state == State.CALIBRATING:
+ logging.info("✅ State: CALIBRATING - Starting calibration")
+
+ # Perform calibration
+ calibration_offsets = perform_calibration()
+
+ # Write calibration results to Tech.C for KRL
+ logging.info("Writing calibration offsets to KRL...")
+ api.krl.write_param('C12', calibration_offsets[0]) # X offset
+ api.krl.write_param('C13', calibration_offsets[1]) # Y offset
+ api.krl.write_param('C14', calibration_offsets[2]) # Z offset
+
+ # Signal calibration complete
+ api.krl.signal_complete(1)
+ logging.info("✅ Calibration complete, signaled KRL")
+
+ elif current_state == State.READY:
+ logging.info("✅ State: READY - System ready for motion")
+ # KRL is ready to execute, no action needed from Python
+ time.sleep(0.1)
+
+ elif current_state == State.EXECUTING:
+ logging.info("✅ State: EXECUTING - Robot in motion")
+
+ # Monitor execution (could send real-time corrections here)
+ # Example: Send RSI corrections based on sensor feedback
+ # api.motion.update_cartesian(X=calibration_offsets[0])
+
+ # For this example, just monitor
+ time.sleep(0.1)
+
+ elif current_state == State.COMPLETE:
+ logging.info("✅ State: COMPLETE - Task finished successfully!")
+ logging.info("State machine workflow complete")
+ break # Exit loop, task is done
+
+ elif current_state == State.ERROR:
+ logging.error("❌ State: ERROR - Error detected in KRL program!")
+ logging.error("Aborting state machine workflow")
+ break # Exit loop, error occurred
+
+ else:
+ logging.warning(f"⚠️ Unknown state: {current_state}")
+ time.sleep(0.1)
+
+ # Prevent tight loop
+ time.sleep(0.05) # Check state every 50ms
+
+ logging.info("State machine monitoring ended")
+
+ except KeyboardInterrupt:
+ logging.warning("\n⚠️ Interrupted by user")
+ # Signal error to KRL
+ try:
+ api.io.set_output(2, True) # Error signal
+ logging.info("Signaled error to KRL")
+ except:
+ pass
+
+ except Exception as e:
+ logging.error(f"❌ Error during state machine: {e}")
+ # Signal error to KRL
+ try:
+ api.io.set_output(2, True)
+ except:
+ pass
+
+ finally:
+ logging.info("Stopping RSI communication...")
+ api.stop()
+ logging.info("✅ API stopped successfully")
+
+
+def main():
+ """Main entry point."""
+ parser = argparse.ArgumentParser(description='State Machine Coordination Example')
+ parser.add_argument(
+ '--config',
+ type=str,
+ default='RSI_EthernetConfig.xml',
+ help='Path to RSI configuration file'
+ )
+
+ args = parser.parse_args()
+
+ logging.info("=" * 60)
+ logging.info("RSIPI - State Machine Coordination Example")
+ logging.info("=" * 60)
+ logging.info(f"Config: {args.config}")
+ logging.info("=" * 60)
+
+ state_machine_example(args.config)
+
+ logging.info("=" * 60)
+ logging.info("Example complete!")
+ logging.info("=" * 60)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/coordination/README.md b/examples/coordination/README.md
new file mode 100644
index 0000000..840785f
--- /dev/null
+++ b/examples/coordination/README.md
@@ -0,0 +1,345 @@
+# Python-KRL Coordination Examples
+
+This directory contains Python examples demonstrating Python-KRL coordination patterns using RSIPI Phase 3 features.
+
+## Prerequisites
+
+- RSIPI library installed (`pip install -e .` from rsi-pi directory)
+- KUKA robot controller with RSI 3.3 configured
+- RSI_EthernetConfig.xml configured with required I/O and Tech variables
+- Corresponding KRL programs uploaded to robot controller (see `templates/krl/`)
+
+## Examples
+
+### 01_basic_handshake.py
+**Simple I/O handshaking**
+
+Demonstrates basic bidirectional signaling using digital I/O channels.
+
+**Requires**: `templates/krl/basic_handshake.src` running on robot
+
+**Run**:
+```bash
+python 01_basic_handshake.py --config path/to/RSI_EthernetConfig.xml
+```
+
+**Flow**:
+1. Start Python script
+2. Execute KRL program on teach pendant
+3. Python waits for KRL ready signal
+4. Python performs processing
+5. Python signals completion
+6. KRL continues with motion
+
+**API Features Demonstrated**:
+- `api.krl.wait_for_signal(channel, timeout)`
+- `api.krl.signal_complete(channel)`
+
+---
+
+### 02_parameter_passing.py
+**Bidirectional parameter exchange**
+
+Demonstrates numerical data exchange using RSI Tech variables.
+
+**Requires**: `templates/krl/parameter_passing.src` running on robot
+
+**Run**:
+```bash
+python 02_parameter_passing.py --config path/to/RSI_EthernetConfig.xml
+```
+
+**Flow**:
+1. KRL writes current position to Tech.T variables
+2. Python reads position data
+3. Python calculates target position
+4. Python writes target to Tech.C variables
+5. Python signals completion
+6. KRL reads target and executes motion
+
+**API Features Demonstrated**:
+- `api.krl.read_param(slot)` - Read Tech.T variables
+- `api.krl.write_param(slot, value)` - Write Tech.C variables
+- `api.krl.wait_for_signal(channel, timeout)`
+- `api.krl.signal_complete(channel)`
+
+---
+
+### 03_state_machine.py
+**Multi-state workflow coordination**
+
+Demonstrates complex state machine with error handling and calibration.
+
+**Requires**: `templates/krl/state_machine.src` running on robot
+
+**Run**:
+```bash
+python 03_state_machine.py --config path/to/RSI_EthernetConfig.xml
+```
+
+**Flow**:
+1. Python monitors state variable (Tech.T11)
+2. Calibration state: Python performs calibration routine
+3. Python writes calibration offsets to Tech.C
+4. Executing state: KRL uses offsets for motion
+5. Complete state: Workflow finishes successfully
+
+**States**:
+- 0: IDLE - Waiting to start
+- 1: CALIBRATING - Python calibration in progress
+- 2: READY - Ready for motion
+- 3: EXECUTING - Robot in motion
+- 4: COMPLETE - Task finished
+- 9: ERROR - Error condition
+
+**API Features Demonstrated**:
+- All coordination methods from examples 01 and 02
+- State monitoring loop
+- Error handling with I/O signals
+- Real-time state transitions
+
+---
+
+## Configuration Requirements
+
+### RSI XML Configuration
+
+Your `RSI_EthernetConfig.xml` must include the following elements:
+
+**Digital I/O:**
+```xml
+
+
+
+
+
+
+
+
+
+
+```
+
+**Tech Variables:**
+```xml
+
+
+
+
+
+
+
+
+
+
+```
+
+### Network Settings
+
+Ensure your RSI network settings match:
+```xml
+192.168.1.100
+49152
+ImFree
+```
+
+## Running Examples
+
+### Step 1: Start Python Script
+
+```bash
+# In one terminal
+cd examples/coordination
+python 01_basic_handshake.py --config ../../RSI_EthernetConfig.xml
+```
+
+### Step 2: Execute KRL Program
+
+1. Upload corresponding KRL program to robot controller
+2. Switch to AUTO mode on teach pendant
+3. Select and execute the KRL program
+4. Monitor coordination in Python terminal
+
+### Step 3: Monitor Output
+
+Python will log:
+- State transitions
+- I/O signal changes
+- Parameter reads/writes
+- Errors and warnings
+
+**Example Output**:
+```
+2026-01-17 14:32:01 - INFO - Starting RSI communication...
+2026-01-17 14:32:01 - INFO - ✅ RSI started successfully
+2026-01-17 14:32:01 - INFO - Waiting for KRL ready signal...
+2026-01-17 14:32:05 - INFO - ✅ KRL signaled ready!
+2026-01-17 14:32:05 - INFO - Performing Python-side processing...
+2026-01-17 14:32:07 - INFO - ✅ Processing complete
+2026-01-17 14:32:07 - INFO - ✅ Signaled KRL to continue
+```
+
+## Customizing Examples
+
+### Modify Processing Logic
+
+In `01_basic_handshake.py`:
+```python
+# Replace simulated processing
+time.sleep(2.0)
+
+# With actual processing
+sensor_data = read_sensor()
+processed_result = analyze_data(sensor_data)
+update_database(processed_result)
+```
+
+### Change Calculation Logic
+
+In `02_parameter_passing.py`:
+```python
+# Modify target calculation
+target_x = current_x + 100.0 # Original
+target_x = calculate_adaptive_target(current_x, sensor_feedback) # Custom
+```
+
+### Extend State Machine
+
+In `03_state_machine.py`:
+```python
+# Add new states
+class State(IntEnum):
+ IDLE = 0
+ CALIBRATING = 1
+ INSPECTING = 2 # NEW STATE
+ READY = 3 # Renumber subsequent states
+ # ...
+
+# Handle new state
+elif current_state == State.INSPECTING:
+ inspection_result = perform_inspection()
+ api.krl.write_param('C20', inspection_result)
+ api.krl.signal_complete(1)
+```
+
+## Troubleshooting
+
+### "Timeout waiting for KRL signal"
+
+**Problem**: Python doesn't receive expected I/O signal from KRL
+
+**Solutions**:
+1. Verify KRL program is running on robot
+2. Check I/O configuration in RSI XML
+3. Verify network connectivity (ping robot IP)
+4. Check signal mapping ($OUT[1] → Digout.o1)
+5. Increase timeout: `api.krl.wait_for_signal(1, timeout=60.0)`
+
+### "RSIVariableError: Tech.T11 not found"
+
+**Problem**: Tech variable not in receive_variables
+
+**Solutions**:
+1. Add Tech variables to RSI XML `` section
+2. Restart robot controller after XML changes
+3. Verify variable configuration: `api.tools.show_variables()`
+
+### "Connection refused"
+
+**Problem**: Cannot connect to robot controller
+
+**Solutions**:
+1. Check robot IP address in RSI XML
+2. Verify robot is in correct mode (T1/T2/AUTO)
+3. Ensure firewall allows UDP port 49152
+4. Check RSI is enabled on robot controller
+
+### KRL Program Halts
+
+**Problem**: KRL program stops unexpectedly
+
+**Solutions**:
+1. Check KRL timeout values (increase if needed)
+2. Verify Python script is running before KRL execution
+3. Check error signals ($IN[2] for error condition)
+4. Review KRL logs on teach pendant
+
+## Advanced Usage
+
+### Non-Blocking Monitoring
+
+For continuous operation without blocking:
+
+```python
+import threading
+
+def monitor_state():
+ while running:
+ state = api.krl.read_param('T11')
+ if state == CRITICAL_STATE:
+ handle_critical_state()
+ time.sleep(0.1)
+
+# Run monitoring in background thread
+monitor_thread = threading.Thread(target=monitor_state, daemon=True)
+monitor_thread.start()
+
+# Main thread does other work
+perform_other_tasks()
+```
+
+### Multiple Coordination Channels
+
+Use different I/O channels for parallel coordination:
+
+```python
+# Channel 1: Main workflow
+if api.krl.wait_for_signal(1):
+ api.krl.signal_complete(1)
+
+# Channel 2: Emergency stop
+if api.io.get_input(2): # Emergency input
+ logging.error("Emergency stop!")
+ api.safety.stop()
+
+# Channel 3: Auxiliary signaling
+api.io.pulse(3, duration=0.1) # Quick pulse signal
+```
+
+### Integration with Motion Control
+
+Combine coordination with real-time RSI corrections:
+
+```python
+# Wait for motion start
+api.krl.wait_for_signal(1)
+
+# Send real-time corrections during KRL motion
+for i in range(100):
+ sensor_offset = get_sensor_offset()
+ api.motion.update_cartesian(X=sensor_offset)
+ time.sleep(0.004) # 250Hz
+
+# Signal motion complete
+api.krl.signal_complete(1)
+```
+
+## Next Steps
+
+1. **Test examples** with your robot controller
+2. **Adapt templates** to your specific application
+3. **Implement error recovery** mechanisms
+4. **Create custom workflows** for your use case
+5. **Document coordination** protocols for your team
+
+## References
+
+- [RSIPI API Documentation](../../README.md)
+- [KRL Templates](../../templates/krl/README.md)
+- [Phase 3 Summary](../../PHASE_3_SUMMARY.md) (when available)
+- [KUKA RSI 3.3 Manual](https://www.kuka.com)
+
+---
+
+**Last Updated**: January 17, 2026
+**RSIPI Version**: 2.0.0
diff --git a/src/RSIPI/io_api.py b/src/RSIPI/io_api.py
index e79d1ec..b28efc0 100644
--- a/src/RSIPI/io_api.py
+++ b/src/RSIPI/io_api.py
@@ -1,7 +1,8 @@
"""Digital I/O API namespace for RSIPI."""
import logging
-from typing import Union, TYPE_CHECKING
+import time
+from typing import Union, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from .rsi_client import RSIClient
@@ -61,15 +62,143 @@ class IOAPI:
logging.debug(f"I/O {var_name} set to {state_value}")
return result
- # TODO (Phase 3): Implement high-level I/O helpers
- # def set_output(self, channel: int, value: bool) -> str:
- # """Set digital output by channel number."""
- # pass
- #
- # def get_input(self, channel: int) -> bool:
- # """Read digital input by channel number."""
- # pass
- #
- # def pulse(self, channel: int, duration: float = 0.1) -> None:
- # """Generate a pulse on the specified output channel."""
- # pass
+ def set_output(self, channel: int, value: bool, group: str = 'Digout') -> str:
+ """
+ Set digital output by channel number.
+
+ High-level wrapper for setting digital outputs. More convenient than
+ toggle() when working with standard digital output channels.
+
+ Args:
+ channel: Output channel number (1-based, e.g., 1 for o1)
+ value: Desired state (True = ON, False = OFF)
+ group: I/O group name (default: 'Digout')
+
+ Returns:
+ Status message indicating success
+
+ Raises:
+ RSIVariableError: If the output channel doesn't exist
+ RSISafetyViolation: If safety checks prevent the operation
+
+ Example:
+ >>> api.io.set_output(1, True) # Turn ON output 1
+ 'Updated Digout.o1 to 1'
+ >>> api.io.set_output(3, False) # Turn OFF output 3
+ 'Updated Digout.o3 to 0'
+ >>> api.io.set_output(5, True, group='DiO') # Custom group
+ 'Updated DiO.5 to 1'
+
+ Note:
+ The default group 'Digout' corresponds to standard KUKA digital
+ outputs configured in RSI. Channel numbering starts at 1 to match
+ KUKA controller conventions.
+ """
+ channel_name = f"o{channel}"
+ return self.toggle(group, channel_name, value)
+
+ def get_input(self, channel: int, group: str = 'Digin') -> bool:
+ """
+ Read digital input by channel number.
+
+ High-level wrapper for reading digital input states from the robot
+ controller. Returns current state as boolean.
+
+ Args:
+ channel: Input channel number (1-based, e.g., 1 for i1)
+ group: I/O group name (default: 'Digin')
+
+ Returns:
+ True if input is HIGH/ON, False if LOW/OFF
+
+ Raises:
+ RSIVariableError: If the input channel doesn't exist in receive_variables
+
+ Example:
+ >>> # Check if input 1 is active
+ >>> if api.io.get_input(1):
+ ... print("Sensor triggered!")
+ Sensor triggered!
+
+ >>> # Read from custom group
+ >>> state = api.io.get_input(5, group='DiI')
+ >>> print(f"Input 5 state: {state}")
+ Input 5 state: True
+
+ Note:
+ This reads from receive_variables, which contains the robot
+ controller's current I/O state. Values are updated every RSI
+ cycle (~4ms).
+ """
+ from .exceptions import RSIVariableError
+
+ channel_name = f"i{channel}"
+ var_name = f"{group}.{channel_name}"
+
+ # Navigate nested receive_variables structure
+ if group in self.client.receive_variables:
+ group_dict = self.client.receive_variables.get(group, {})
+ if isinstance(group_dict, dict) and channel_name in group_dict:
+ value = group_dict[channel_name]
+ return bool(value)
+ else:
+ raise RSIVariableError(f"Input channel '{channel_name}' not found in group '{group}'")
+ else:
+ raise RSIVariableError(f"Input group '{group}' not found in receive_variables")
+
+ def pulse(self, channel: int, duration: float = 0.1, group: str = 'Digout') -> str:
+ """
+ Generate a timed pulse on the specified output channel.
+
+ Turns the output ON, waits for the specified duration, then turns it OFF.
+ Useful for triggering pneumatic actuators, solenoids, or signaling events.
+
+ Args:
+ channel: Output channel number (1-based)
+ duration: Pulse duration in seconds (default: 0.1 = 100ms)
+ group: I/O group name (default: 'Digout')
+
+ Returns:
+ Status message indicating completion
+
+ Raises:
+ RSIVariableError: If the output channel doesn't exist
+ RSISafetyViolation: If safety checks prevent the operation
+
+ Example:
+ >>> # 100ms pulse on output 2
+ >>> api.io.pulse(2)
+ 'Pulse completed on Digout.o2 (duration: 0.1s)'
+
+ >>> # 500ms pulse on output 5
+ >>> api.io.pulse(5, duration=0.5)
+ 'Pulse completed on Digout.o5 (duration: 0.5s)'
+
+ >>> # Trigger pneumatic gripper on custom channel
+ >>> api.io.pulse(3, duration=0.2, group='DiO')
+ 'Pulse completed on DiO.o3 (duration: 0.2s)'
+
+ Warning:
+ This method blocks for the duration of the pulse. For non-blocking
+ pulses, consider using threading or async I/O patterns.
+
+ Note:
+ Pulse timing accuracy depends on system load and RSI cycle time.
+ For critical timing requirements, consider hardware-timed outputs
+ or KRL-based pulse generation.
+ """
+ channel_name = f"o{channel}"
+ var_name = f"{group}.{channel_name}"
+
+ # Turn ON
+ self.set_output(channel, True, group=group)
+ logging.debug(f"Pulse started on {var_name}")
+
+ # Wait for duration
+ time.sleep(duration)
+
+ # Turn OFF
+ self.set_output(channel, False, group=group)
+ logging.info(f"Pulse completed on {var_name} (duration: {duration}s)")
+
+ return f"Pulse completed on {var_name} (duration: {duration}s)"
diff --git a/src/RSIPI/krl_api.py b/src/RSIPI/krl_api.py
index 36b3da2..3af83d3 100644
--- a/src/RSIPI/krl_api.py
+++ b/src/RSIPI/krl_api.py
@@ -1,7 +1,8 @@
"""KRL program manipulation API namespace for RSIPI."""
import logging
-from typing import Optional, TYPE_CHECKING
+import time
+from typing import Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from .rsi_client import RSIClient
@@ -115,19 +116,267 @@ class KRLAPI:
logging.error(f"RSI injection failed: {e}")
return f"RSI injection failed: {e}"
- # TODO (Phase 3): Implement KRL coordination helpers
- # def wait_for_signal(self, channel: int, timeout: float = 5.0) -> bool:
- # """Wait for KRL to set a specific I/O signal."""
- # pass
- #
- # def signal_complete(self, channel: int) -> None:
- # """Signal to KRL that Python-side operation is complete."""
- # pass
- #
- # def write_param(self, slot: str, value: float) -> str:
- # """Write parameter to Tech.C variable for KRL to read."""
- # pass
- #
- # def read_param(self, slot: str) -> float:
- # """Read parameter from Tech.T variable written by KRL."""
- # pass
+ def wait_for_signal(
+ self,
+ channel: int,
+ timeout: float = 5.0,
+ check_interval: float = 0.01,
+ group: str = 'Digin'
+ ) -> bool:
+ """
+ Wait for KRL to set a specific I/O signal.
+
+ Blocks until the specified digital input becomes HIGH, or timeout occurs.
+ Commonly used for synchronization where Python waits for KRL to signal
+ completion of a robot operation before proceeding.
+
+ Args:
+ channel: Input channel number to monitor (1-based)
+ timeout: Maximum wait time in seconds (default: 5.0)
+ check_interval: Polling interval in seconds (default: 0.01 = 10ms)
+ group: I/O group name (default: 'Digin')
+
+ Returns:
+ True if signal received, False if timeout occurred
+
+ Example:
+ >>> # Wait for KRL to signal ready on input 3
+ >>> if api.krl.wait_for_signal(3, timeout=10.0):
+ ... print("KRL signaled ready!")
+ ... # Proceed with Python-side processing
+ ... else:
+ ... print("Timeout waiting for KRL signal")
+
+ >>> # Handshake pattern: Python waits → KRL signals → Python continues
+ >>> api.motion.update_cartesian(X=100) # Send correction
+ >>> api.krl.wait_for_signal(1) # Wait for KRL to acknowledge
+ >>> # KRL has processed the correction, safe to continue
+
+ Note:
+ This is a blocking operation. The check_interval determines polling
+ frequency - lower values provide faster response but higher CPU usage.
+ For typical RSI applications, 10-50ms intervals are appropriate.
+
+ Warning:
+ Ensure the KRL program actually sets the signal, otherwise this will
+ block until timeout. Consider using try/except for timeout handling.
+ """
+ from .io_api import IOAPI
+
+ io_api = IOAPI(self.client)
+ start_time = time.time()
+
+ logging.debug(f"Waiting for signal on {group}.i{channel} (timeout: {timeout}s)")
+
+ while (time.time() - start_time) < timeout:
+ if io_api.get_input(channel, group=group):
+ elapsed = time.time() - start_time
+ logging.info(f"Signal received on {group}.i{channel} after {elapsed:.3f}s")
+ return True
+ time.sleep(check_interval)
+
+ logging.warning(f"Timeout waiting for signal on {group}.i{channel} after {timeout}s")
+ return False
+
+ def signal_complete(self, channel: int, group: str = 'Digout') -> str:
+ """
+ Signal to KRL that Python-side operation is complete.
+
+ Sets the specified digital output HIGH to notify the KRL program that
+ Python has finished processing and KRL can proceed.
+
+ Args:
+ channel: Output channel number to signal on (1-based)
+ group: I/O group name (default: 'Digout')
+
+ Returns:
+ Status message indicating success
+
+ Raises:
+ RSIVariableError: If the output channel doesn't exist
+ RSISafetyViolation: If safety checks prevent the operation
+
+ Example:
+ >>> # Signal KRL that data processing is complete
+ >>> api.krl.signal_complete(2)
+ 'Signaled complete on Digout.o2'
+
+ >>> # Typical coordination pattern:
+ >>> # 1. KRL sends data via Tech variables
+ >>> # 2. Python processes data
+ >>> result = api.krl.read_param('T11') # Read from KRL
+ >>> processed = result * 2.0 # Process
+ >>> api.krl.write_param('C11', processed) # Write result
+ >>> api.krl.signal_complete(1) # Tell KRL we're done
+
+ Note:
+ This sets the output and leaves it HIGH. If you need to reset the
+ signal after KRL acknowledges, use api.io.pulse() instead or manually
+ call api.io.set_output(channel, False) after KRL reads the signal.
+
+ See Also:
+ wait_for_signal() - Complementary method for waiting on inputs
+ api.io.pulse() - For temporary signal pulses
+ """
+ from .io_api import IOAPI
+
+ io_api = IOAPI(self.client)
+ io_api.set_output(channel, True, group=group)
+ logging.info(f"Signaled complete on {group}.o{channel}")
+ return f"Signaled complete on {group}.o{channel}"
+
+ def write_param(self, slot: Union[int, str], value: float) -> str:
+ """
+ Write parameter to Tech.C variable for KRL to read.
+
+ Tech.C variables (C11-C199) are "Control" parameters written by Python
+ and read by KRL programs. Used for passing numerical data from Python
+ to the robot controller.
+
+ Args:
+ slot: Tech.C slot number (11-199) or string like 'C11', 'c15'
+ value: Numerical value to write
+
+ Returns:
+ Status message indicating success
+
+ Raises:
+ ValueError: If slot number is invalid (must be 11-199)
+ RSIVariableError: If Tech variable group doesn't exist
+ RSISafetyViolation: If safety checks prevent the operation
+
+ Example:
+ >>> # Send target position to KRL
+ >>> api.krl.write_param(11, 650.5) # Tech.C11 = 650.5
+ 'Updated Tech.C11 to 650.5'
+
+ >>> # Send multiple parameters
+ >>> api.krl.write_param('C12', 120.0) # X coordinate
+ >>> api.krl.write_param('C13', -50.0) # Y coordinate
+ >>> api.krl.write_param('C14', 800.0) # Z coordinate
+
+ >>> # KRL side reads with: target_x = $TECH.C[12]
+
+ Note:
+ KUKA RSI Tech variables support slots 11-199. Slots 1-10 are reserved.
+ The KRL program must read from $TECH.C[n] to access these values.
+
+ KRL Example:
+ ```krl
+ DEF my_program()
+ DECL REAL target_x, target_y, target_z
+ ; Python writes to C12, C13, C14
+ target_x = $TECH.C[12]
+ target_y = $TECH.C[13]
+ target_z = $TECH.C[14]
+ ; Use coordinates...
+ END
+ ```
+
+ See Also:
+ read_param() - Read Tech.T variables written by KRL
+ """
+ # Normalize slot to integer
+ if isinstance(slot, str):
+ slot_str = slot.upper().strip()
+ if slot_str.startswith('C'):
+ slot_num = int(slot_str[1:])
+ else:
+ slot_num = int(slot_str)
+ else:
+ slot_num = int(slot)
+
+ # Validate slot range (KUKA reserves 1-10, usable range is 11-199)
+ if not (11 <= slot_num <= 199):
+ raise ValueError(f"Tech slot must be between 11-199, got {slot_num}")
+
+ from .tools_api import ToolsAPI
+
+ tools = ToolsAPI(self.client)
+ var_name = f"Tech.C{slot_num}"
+ result = tools.update_variable(var_name, value)
+ logging.debug(f"Wrote {value} to {var_name}")
+ return result
+
+ def read_param(self, slot: Union[int, str]) -> float:
+ """
+ Read parameter from Tech.T variable written by KRL.
+
+ Tech.T variables (T11-T199) are "Transfer" parameters written by KRL
+ programs and read by Python. Used for passing numerical data from the
+ robot controller to Python.
+
+ Args:
+ slot: Tech.T slot number (11-199) or string like 'T11', 't15'
+
+ Returns:
+ Numerical value from the Tech.T slot
+
+ Raises:
+ ValueError: If slot number is invalid (must be 11-199)
+ RSIVariableError: If Tech variable group doesn't exist or slot not found
+
+ Example:
+ >>> # Read sensor value from KRL
+ >>> force = api.krl.read_param(11) # Read Tech.T11
+ >>> print(f"Force reading: {force}")
+ Force reading: 125.5
+
+ >>> # Read multiple parameters
+ >>> actual_x = api.krl.read_param('T12')
+ >>> actual_y = api.krl.read_param('T13')
+ >>> actual_z = api.krl.read_param('T14')
+
+ >>> # KRL side writes with: $TECH.T[12] = actual_pos.X
+
+ Note:
+ Tech.T variables are updated every RSI cycle (~4ms) from the robot
+ controller. Values reflect the KRL program's last write operation.
+
+ KRL Example:
+ ```krl
+ DEF my_program()
+ DECL E6POS actual_pos
+ actual_pos = $POS_ACT
+ ; Write to Tech.T for Python to read
+ $TECH.T[12] = actual_pos.X
+ $TECH.T[13] = actual_pos.Y
+ $TECH.T[14] = actual_pos.Z
+ END
+ ```
+
+ Warning:
+ Ensure the KRL program has written to the Tech.T slot before reading,
+ otherwise you'll receive the default value (typically 0.0).
+
+ See Also:
+ write_param() - Write Tech.C variables for KRL to read
+ """
+ from .exceptions import RSIVariableError
+
+ # Normalize slot to integer
+ if isinstance(slot, str):
+ slot_str = slot.upper().strip()
+ if slot_str.startswith('T'):
+ slot_num = int(slot_str[1:])
+ else:
+ slot_num = int(slot_str)
+ else:
+ slot_num = int(slot)
+
+ # Validate slot range
+ if not (11 <= slot_num <= 199):
+ raise ValueError(f"Tech slot must be between 11-199, got {slot_num}")
+
+ # Read from receive_variables
+ if 'Tech' in self.client.receive_variables:
+ tech_dict = self.client.receive_variables.get('Tech', {})
+ var_name = f"T{slot_num}"
+ if isinstance(tech_dict, dict) and var_name in tech_dict:
+ value = tech_dict[var_name]
+ logging.debug(f"Read {value} from Tech.{var_name}")
+ return float(value)
+ else:
+ raise RSIVariableError(f"Tech.{var_name} not found in receive_variables")
+ else:
+ raise RSIVariableError("Tech variable group not found in receive_variables")
diff --git a/templates/krl/README.md b/templates/krl/README.md
new file mode 100644
index 0000000..0262bac
--- /dev/null
+++ b/templates/krl/README.md
@@ -0,0 +1,393 @@
+# KRL Coordination Templates
+
+This directory contains KRL program templates demonstrating common Python-KRL coordination patterns using RSIPI.
+
+## Available Templates
+
+### 1. basic_handshake.src
+**Simple I/O handshaking between Python and KRL**
+
+- KRL signals "ready" via digital output
+- Python waits for signal, processes data
+- Python signals "complete" via input
+- KRL waits for completion, then continues
+
+**Use Case**: Basic synchronization, ensuring Python completes processing before KRL continues.
+
+**Coordination Methods Used**:
+- `api.krl.wait_for_signal(channel, timeout)`
+- `api.krl.signal_complete(channel)`
+
+### 2. parameter_passing.src
+**Bidirectional numerical data exchange via Tech variables**
+
+- KRL writes current position to Tech.T variables
+- Python reads position data
+- Python calculates target and writes to Tech.C variables
+- KRL reads target and executes motion
+
+**Use Case**: Passing numerical parameters (positions, forces, tolerances) between Python and KRL.
+
+**Coordination Methods Used**:
+- `api.krl.read_param(slot)` - Read from Tech.T
+- `api.krl.write_param(slot, value)` - Write to Tech.C
+- `api.krl.wait_for_signal(channel, timeout)`
+- `api.krl.signal_complete(channel)`
+
+### 3. state_machine.src
+**Multi-state workflow with complex coordination**
+
+Implements a 5-state machine:
+1. **IDLE**: Waiting to start
+2. **CALIBRATING**: Python performing calibration
+3. **READY**: Calibration complete
+4. **EXECUTING**: Robot motion in progress
+5. **COMPLETE**: Task finished
+6. **ERROR**: Error handling state
+
+**Use Case**: Complex workflows requiring multiple handshakes, error handling, and state tracking.
+
+**Coordination Methods Used**:
+- All coordination methods from basic_handshake and parameter_passing
+- State variable in Tech.T[11]
+- Command variable in Tech.C[11]
+
+## Python-KRL Coordination Patterns
+
+### Pattern 1: Simple Handshake
+
+```python
+# Python side
+api.krl.wait_for_signal(1) # Wait for KRL ready signal
+# Do processing...
+api.krl.signal_complete(1) # Signal KRL to continue
+```
+
+```krl
+; KRL side
+$OUT[1] = TRUE ; Signal ready to Python
+; Wait for Python completion
+WHILE $IN[1] == FALSE
+ WAIT SEC 0.1
+ENDWHILE
+```
+
+### Pattern 2: Parameter Exchange
+
+```python
+# Python side
+api.krl.wait_for_signal(1)
+
+# Read from KRL
+value = api.krl.read_param('T11')
+
+# Process and write back
+result = process(value)
+api.krl.write_param('C11', result)
+
+api.krl.signal_complete(1)
+```
+
+```krl
+; KRL side
+$TECH.T[11] = some_value
+$OUT[1] = TRUE ; Signal data ready
+
+; Wait for Python
+WHILE $IN[1] == FALSE
+ WAIT SEC 0.1
+ENDWHILE
+
+; Read result
+result = $TECH.C[11]
+```
+
+### Pattern 3: Continuous Monitoring
+
+```python
+# Python side - non-blocking monitoring loop
+api.start()
+
+while api.is_running():
+ state = api.krl.read_param('T11')
+
+ if state == 1: # Specific state
+ # React to state change
+ api.krl.write_param('C11', calculated_value)
+ api.krl.signal_complete(1)
+
+ time.sleep(0.1) # Check every 100ms
+
+api.stop()
+```
+
+```krl
+; KRL side - updates state continuously
+$TECH.T[11] = current_state
+
+; Wait for Python response when needed
+WHILE $IN[1] == FALSE
+ WAIT SEC 0.1
+ENDWHILE
+
+calculated = $TECH.C[11]
+```
+
+## Tech Variable Conventions
+
+### Tech.C Variables (Python → KRL)
+**"Control" variables - Python writes, KRL reads**
+
+| Slot | Description | Example Usage |
+|------|-------------|---------------|
+| C11 | Command/state | 0=continue, 1=pause, 2=abort |
+| C12-C14 | Position offsets | X, Y, Z corrections |
+| C15-C17 | Target position | Calculated target coordinates |
+| C18-C20 | Process parameters | Speed, force, tolerance |
+| C21+ | Custom parameters | Application-specific data |
+
+```python
+# Python writes
+api.krl.write_param('C11', 0) # Command: continue
+api.krl.write_param('C12', 5.0) # X offset
+api.krl.write_param('C13', -2.0) # Y offset
+```
+
+```krl
+; KRL reads
+command = $TECH.C[11]
+offset_x = $TECH.C[12]
+offset_y = $TECH.C[13]
+```
+
+### Tech.T Variables (KRL → Python)
+**"Transfer" variables - KRL writes, Python reads**
+
+| Slot | Description | Example Usage |
+|------|-------------|---------------|
+| T11 | Current state | State machine state number |
+| T12-T14 | Current position | X, Y, Z coordinates |
+| T15-T17 | Force/torque | Measured forces |
+| T18-T20 | Sensor readings | External sensor data |
+| T21+ | Custom data | Application-specific values |
+
+```krl
+; KRL writes
+$TECH.T[11] = current_state
+$TECH.T[12] = $POS_ACT.X
+$TECH.T[13] = $POS_ACT.Y
+$TECH.T[14] = $POS_ACT.Z
+```
+
+```python
+# Python reads
+state = api.krl.read_param('T11')
+pos_x = api.krl.read_param('T12')
+pos_y = api.krl.read_param('T13')
+pos_z = api.krl.read_param('T14')
+```
+
+## I/O Signal Conventions
+
+### Standard I/O Mapping
+
+| Signal | Type | Purpose |
+|--------|------|---------|
+| $OUT[1] | Output | KRL → Python state/ready signal |
+| $IN[1] | Input | Python → KRL completion acknowledgement |
+| $IN[2] | Input | Python → KRL error signal |
+| $OUT[2] | Output | KRL → Python auxiliary signal |
+
+```python
+# Python I/O methods
+api.krl.wait_for_signal(1) # Wait for $OUT[1]
+api.krl.signal_complete(1) # Set $IN[1]
+api.io.set_output(2, True) # Control $OUT[2]
+```
+
+## Error Handling Best Practices
+
+### Timeouts
+
+**Always use timeouts to prevent indefinite blocking:**
+
+```python
+# Python
+if not api.krl.wait_for_signal(1, timeout=10.0):
+ print("Timeout waiting for KRL!")
+ # Handle error
+```
+
+```krl
+; KRL
+INT counter
+counter = 0
+WHILE ($IN[1] == FALSE) AND (counter < 100)
+ WAIT SEC 0.1
+ counter = counter + 1
+ENDWHILE
+
+IF counter >= 100 THEN
+ ; Timeout - handle error
+ HALT
+ENDIF
+```
+
+### Error Signaling
+
+**Use dedicated error channels:**
+
+```python
+# Python detects error
+if error_condition:
+ api.io.set_output(2, True) # Signal error to KRL
+```
+
+```krl
+; KRL checks for errors
+IF $IN[2] == TRUE THEN
+ ; Python signaled error
+ HALT
+ENDIF
+```
+
+## Integration with RSI Motion Control
+
+All coordination patterns work seamlessly with RSI real-time motion corrections:
+
+```python
+# Python coordinates with KRL AND sends real-time corrections
+api.start()
+
+# Wait for KRL to start motion phase
+api.krl.wait_for_signal(1)
+
+# Send real-time corrections during KRL motion
+for i in range(100):
+ correction = calculate_correction()
+ api.motion.update_cartesian(X=correction)
+ time.sleep(0.004) # 250Hz update rate
+
+# Signal motion phase complete
+api.krl.signal_complete(1)
+
+api.stop()
+```
+
+```krl
+; KRL executes motion while Python sends corrections
+$OUT[1] = TRUE ; Signal motion start
+
+; Python sends corrections via RSI during this move
+LIN target_pos Vel=0.5 m/s CPDAT1 Tool[1] Base[0]
+
+; Wait for Python to finish corrections
+WHILE $IN[1] == FALSE
+ WAIT SEC 0.1
+ENDWHILE
+```
+
+## Testing Templates
+
+To test these templates:
+
+1. **Upload KRL program to robot controller**
+2. **Start Python coordination script**
+3. **Execute KRL program on teach pendant**
+4. **Monitor coordination in Python logs**
+
+Example Python test script:
+
+```python
+from RSIPI import RSIAPI
+import time
+
+api = RSIAPI('RSI_EthernetConfig.xml')
+api.start()
+
+try:
+ print("Waiting for KRL ready signal...")
+
+ if api.krl.wait_for_signal(1, timeout=30.0):
+ print("✅ KRL signaled ready!")
+
+ # Simulate processing
+ time.sleep(2.0)
+ print("Processing complete")
+
+ # Signal back to KRL
+ api.krl.signal_complete(1)
+ print("✅ Signaled KRL to continue")
+
+ else:
+ print("❌ Timeout waiting for KRL")
+
+except KeyboardInterrupt:
+ print("\n⚠️ Interrupted by user")
+
+finally:
+ api.stop()
+ print("API stopped")
+```
+
+## Troubleshooting
+
+### Signal Not Received
+
+**Check I/O configuration in RSI XML:**
+```xml
+
+
+
+
+
+
+
+
+
+
+```
+
+### Tech Variable Not Found
+
+**Ensure Tech variables are configured in RSI XML:**
+```xml
+
+
+
+
+
+
+
+
+
+
+```
+
+### Timing Issues
+
+- **Reduce check_interval for faster response**: `api.krl.wait_for_signal(1, check_interval=0.005)`
+- **Increase timeout for slow operations**: `api.krl.wait_for_signal(1, timeout=60.0)`
+- **Add WAIT SEC delays in KRL for signal propagation**
+
+## Next Steps
+
+After understanding these templates:
+
+1. **Adapt templates** to your specific application
+2. **Test coordination** patterns with your robot
+3. **Implement error recovery** mechanisms
+4. **Document custom** coordination protocols
+5. **Create application-specific** state machines
+
+## References
+
+- [RSIPI Documentation](../../README.md)
+- [Phase 3 Summary](../../PHASE_3_SUMMARY.md) (when available)
+- [KUKA RSI 3.3 Documentation](https://www.kuka.com)
+
+---
+
+**Template Author**: RSIPI Development Team
+**Last Updated**: January 17, 2026
+**Version**: 1.0
diff --git a/templates/krl/basic_handshake.src b/templates/krl/basic_handshake.src
new file mode 100644
index 0000000..a2db7ba
--- /dev/null
+++ b/templates/krl/basic_handshake.src
@@ -0,0 +1,80 @@
+&ACCESS RVP
+&REL 1
+&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe
+&PARAM EDITMASK = *
+DEF basic_handshake()
+; ================================================================
+; Basic I/O Handshake Pattern
+; ================================================================
+; Demonstrates simple bidirectional signaling between KRL and Python
+; via digital I/O channels.
+;
+; Python-KRL Coordination Flow:
+; 1. KRL signals "ready" to Python (output 1)
+; 2. Python waits for signal, then processes
+; 3. Python signals "complete" to KRL (input 1)
+; 4. KRL waits for completion signal, then continues
+;
+; Python Code Example:
+; api = RSIAPI('RSI_EthernetConfig.xml')
+; api.start()
+;
+; # Wait for KRL to signal ready
+; if api.krl.wait_for_signal(1, timeout=10.0):
+; print("KRL is ready!")
+;
+; # Do Python processing here
+; time.sleep(1.0)
+;
+; # Signal completion back to KRL
+; api.krl.signal_complete(1)
+;
+; api.stop()
+; ================================================================
+
+ ; Variable declarations
+ INT i
+ BOOL python_ready
+
+ BAS(#INITMOV, 0) ; Initialize motion
+
+ ; ============================================================
+ ; STEP 1: KRL signals ready to Python
+ ; ============================================================
+ $OUT[1] = TRUE ; Signal ready on digital output 1
+ WAIT SEC 0.1 ; Brief delay for signal propagation
+
+ ; ============================================================
+ ; STEP 2: Wait for Python to acknowledge completion
+ ; ============================================================
+ python_ready = FALSE
+ i = 0
+
+ ; Poll input 1 until Python signals completion (max 10 seconds)
+ WHILE (python_ready == FALSE) AND (i < 100)
+ IF $IN[1] == TRUE THEN
+ python_ready = TRUE
+ ELSE
+ WAIT SEC 0.1
+ i = i + 1
+ ENDIF
+ ENDWHILE
+
+ IF python_ready == TRUE THEN
+ ; Python signaled completion successfully
+ ; Safe to proceed with robot motion
+
+ ; Reset handshake signals
+ $OUT[1] = FALSE
+
+ ; Example motion with Python coordination
+ PTP HOME Vel=100 % DEFAULT
+ ; Python can send corrections during motion via RSI
+
+ ELSE
+ ; Timeout - Python did not respond
+ ; Halt program for safety
+ HALT
+ ENDIF
+
+END
diff --git a/templates/krl/parameter_passing.src b/templates/krl/parameter_passing.src
new file mode 100644
index 0000000..8904824
--- /dev/null
+++ b/templates/krl/parameter_passing.src
@@ -0,0 +1,125 @@
+&ACCESS RVP
+&REL 1
+&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe
+&PARAM EDITMASK = *
+DEF parameter_passing()
+; ================================================================
+; Parameter Passing via Tech Variables
+; ================================================================
+; Demonstrates bidirectional numerical data exchange between KRL
+; and Python using RSI Tech variables.
+;
+; Tech Variable Conventions:
+; - Tech.C[11-199]: Python writes, KRL reads (Control variables)
+; - Tech.T[11-199]: KRL writes, Python reads (Transfer variables)
+;
+; Python-KRL Coordination Flow:
+; 1. KRL writes current position to Tech.T
+; 2. Python reads position from Tech.T
+; 3. Python processes data and writes target to Tech.C
+; 4. KRL reads target from Tech.C and executes motion
+;
+; Python Code Example:
+; api = RSIAPI('RSI_EthernetConfig.xml')
+; api.start()
+;
+; # Wait for KRL to signal data ready
+; api.krl.wait_for_signal(1)
+;
+; # Read current position from KRL
+; current_x = api.krl.read_param('T11')
+; current_y = api.krl.read_param('T12')
+; current_z = api.krl.read_param('T13')
+;
+; # Calculate target (e.g., move 100mm in X)
+; target_x = current_x + 100.0
+; target_y = current_y
+; target_z = current_z
+;
+; # Write target back to KRL
+; api.krl.write_param('C11', target_x)
+; api.krl.write_param('C12', target_y)
+; api.krl.write_param('C13', target_z)
+;
+; # Signal completion
+; api.krl.signal_complete(1)
+;
+; api.stop()
+; ================================================================
+
+ ; Variable declarations
+ E6POS current_pos, target_pos
+ REAL target_x, target_y, target_z
+ INT i
+ BOOL data_ready
+
+ BAS(#INITMOV, 0)
+
+ ; ============================================================
+ ; STEP 1: KRL writes current position to Tech.T
+ ; ============================================================
+ current_pos = $POS_ACT ; Get current TCP position
+
+ ; Write position components to Transfer variables (Python reads)
+ $TECH.T[11] = current_pos.X
+ $TECH.T[12] = current_pos.Y
+ $TECH.T[13] = current_pos.Z
+ $TECH.T[14] = current_pos.A
+ $TECH.T[15] = current_pos.B
+ $TECH.T[16] = current_pos.C
+
+ ; ============================================================
+ ; STEP 2: Signal Python that data is ready
+ ; ============================================================
+ $OUT[1] = TRUE
+ WAIT SEC 0.1
+
+ ; ============================================================
+ ; STEP 3: Wait for Python to process and write target
+ ; ============================================================
+ data_ready = FALSE
+ i = 0
+
+ ; Poll input 1 for Python completion signal
+ WHILE (data_ready == FALSE) AND (i < 100)
+ IF $IN[1] == TRUE THEN
+ data_ready = TRUE
+ ELSE
+ WAIT SEC 0.1
+ i = i + 1
+ ENDIF
+ ENDWHILE
+
+ IF data_ready == TRUE THEN
+ ; ============================================================
+ ; STEP 4: Read target position from Tech.C
+ ; ============================================================
+ ; Python has written to Control variables (KRL reads)
+ target_x = $TECH.C[11]
+ target_y = $TECH.C[12]
+ target_z = $TECH.C[13]
+
+ ; Construct target pose
+ target_pos = current_pos ; Copy orientation
+ target_pos.X = target_x
+ target_pos.Y = target_y
+ target_pos.Z = target_z
+
+ ; ============================================================
+ ; STEP 5: Execute motion to target
+ ; ============================================================
+ ; Reset signals
+ $OUT[1] = FALSE
+
+ ; Move to calculated target
+ LIN target_pos Vel=0.5 m/s CPDAT1 Tool[1] Base[0]
+
+ ; Success indication
+ WAIT SEC 0.5
+
+ ELSE
+ ; Timeout - halt for safety
+ HALT
+ ENDIF
+
+END
diff --git a/templates/krl/state_machine.src b/templates/krl/state_machine.src
new file mode 100644
index 0000000..6245e67
--- /dev/null
+++ b/templates/krl/state_machine.src
@@ -0,0 +1,189 @@
+&ACCESS RVP
+&REL 1
+&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe
+&PARAM EDITMASK = *
+DEF state_machine()
+; ================================================================
+; Multi-State Coordination Pattern
+; ================================================================
+; Demonstrates a state machine for complex Python-KRL workflows.
+; Uses both I/O signals and Tech variables for robust coordination.
+;
+; State Definitions:
+; State 0: IDLE - Waiting to start
+; State 1: CALIBRATING - Python performing calibration
+; State 2: READY - Calibration complete, ready for motion
+; State 3: EXECUTING - Robot executing motion task
+; State 4: COMPLETE - Task finished
+; State 9: ERROR - Error condition
+;
+; I/O Mapping:
+; $OUT[1]: State signal to Python (pulse indicates state change)
+; $IN[1]: Python acknowledgement
+; $IN[2]: Python error signal
+;
+; Tech Variable Mapping:
+; Tech.T[11]: Current state (KRL → Python)
+; Tech.C[11]: Python command (0=continue, 1=pause, 2=abort)
+; Tech.C[12-14]: Calibration offset (X, Y, Z)
+;
+; Python Code Example:
+; api = RSIAPI('RSI_EthernetConfig.xml')
+; api.start()
+;
+; # Monitor state transitions
+; while True:
+; state = api.krl.read_param('T11')
+;
+; if state == 1: # CALIBRATING
+; print("Performing calibration...")
+; # Calibration logic here
+; offset_x = 5.0
+; offset_y = -2.0
+; offset_z = 0.5
+;
+; # Write calibration results
+; api.krl.write_param('C12', offset_x)
+; api.krl.write_param('C13', offset_y)
+; api.krl.write_param('C14', offset_z)
+;
+; # Acknowledge state completion
+; api.krl.signal_complete(1)
+;
+; elif state == 3: # EXECUTING
+; print("Robot executing motion...")
+; # Monitor execution, send corrections if needed
+; api.motion.update_cartesian(X=offset_x, Y=offset_y)
+;
+; elif state == 4: # COMPLETE
+; print("Task complete!")
+; break
+;
+; elif state == 9: # ERROR
+; print("Error detected, aborting!")
+; break
+;
+; time.sleep(0.1)
+;
+; api.stop()
+; ================================================================
+
+ ; Variable declarations
+ INT current_state, python_cmd
+ REAL offset_x, offset_y, offset_z
+ E6POS target_pos
+ BOOL ack_received
+ INT timeout_counter
+
+ BAS(#INITMOV, 0)
+
+ ; Initialize state machine
+ current_state = 0 ; IDLE
+ $TECH.T[11] = current_state
+
+ ; ============================================================
+ ; STATE 0: IDLE
+ ; ============================================================
+ current_state = 0
+ $TECH.T[11] = current_state
+ WAIT SEC 1.0
+
+ ; ============================================================
+ ; STATE 1: CALIBRATING
+ ; ============================================================
+ current_state = 1
+ $TECH.T[11] = current_state
+
+ ; Signal Python to start calibration
+ $OUT[1] = TRUE
+ WAIT SEC 0.1
+ $OUT[1] = FALSE
+
+ ; Wait for Python calibration completion
+ ack_received = FALSE
+ timeout_counter = 0
+
+ WHILE (ack_received == FALSE) AND (timeout_counter < 200)
+ ; Check for error signal
+ IF $IN[2] == TRUE THEN
+ current_state = 9 ; ERROR
+ $TECH.T[11] = current_state
+ HALT
+ ENDIF
+
+ ; Check for completion
+ IF $IN[1] == TRUE THEN
+ ack_received = TRUE
+ ELSE
+ WAIT SEC 0.1
+ timeout_counter = timeout_counter + 1
+ ENDIF
+ ENDWHILE
+
+ IF ack_received == FALSE THEN
+ ; Timeout error
+ current_state = 9
+ $TECH.T[11] = current_state
+ HALT
+ ENDIF
+
+ ; ============================================================
+ ; STATE 2: READY
+ ; ============================================================
+ ; Read calibration offsets from Python
+ offset_x = $TECH.C[12]
+ offset_y = $TECH.C[13]
+ offset_z = $TECH.C[14]
+
+ current_state = 2
+ $TECH.T[11] = current_state
+
+ ; Signal ready
+ $OUT[1] = TRUE
+ WAIT SEC 0.1
+ $OUT[1] = FALSE
+ WAIT SEC 0.5
+
+ ; ============================================================
+ ; STATE 3: EXECUTING
+ ; ============================================================
+ current_state = 3
+ $TECH.T[11] = current_state
+
+ ; Calculate target with calibration offset
+ target_pos = $POS_ACT
+ target_pos.X = target_pos.X + offset_x
+ target_pos.Y = target_pos.Y + offset_y
+ target_pos.Z = target_pos.Z + offset_z
+
+ ; Execute motion (Python can send RSI corrections during this)
+ LIN target_pos Vel=0.3 m/s CPDAT1 Tool[1] Base[0]
+
+ ; Check for pause/abort commands
+ python_cmd = $TECH.C[11]
+ IF python_cmd == 2 THEN
+ ; Abort requested
+ current_state = 9
+ $TECH.T[11] = current_state
+ HALT
+ ENDIF
+
+ ; ============================================================
+ ; STATE 4: COMPLETE
+ ; ============================================================
+ current_state = 4
+ $TECH.T[11] = current_state
+
+ ; Signal completion
+ $OUT[1] = TRUE
+ WAIT SEC 0.2
+ $OUT[1] = FALSE
+
+ ; Return to home
+ PTP HOME Vel=100 % DEFAULT
+
+ ; Final state update
+ current_state = 0 ; Back to IDLE
+ $TECH.T[11] = current_state
+
+END