Implement Phase 3: KRL Coordination

Complete implementation of Python-KRL coordination features for seamless
bidirectional communication between RSIPI and KUKA KRL programs.

## IOAPI Enhancements

Added high-level I/O control methods for convenient digital I/O manipulation:

- **set_output(channel, value, group='Digout')** - Set digital output by channel number
- **get_input(channel, group='Digin')** - Read digital input by channel number
- **pulse(channel, duration=0.1, group='Digout')** - Generate timed pulse on output

Benefits:
- Simpler channel-based addressing (channel 1 instead of 'Digout.o1')
- Automatic channel name formatting
- Built-in pulse generation for pneumatic actuators and signaling
- Consistent error handling

## KRLAPI Enhancements

Added coordination helper methods for Python-KRL synchronization:

- **wait_for_signal(channel, timeout=5.0)** - Block until KRL sets I/O signal
- **signal_complete(channel)** - Signal KRL that Python operation is complete
- **write_param(slot, value)** - Write to Tech.C variables (Python → KRL)
- **read_param(slot)** - Read from Tech.T variables (KRL → Python)

Features:
- Configurable timeouts with proper error handling
- Flexible slot addressing (11, 'C11', 'c11' all work)
- Slot validation (enforces 11-199 range)
- Comprehensive logging for debugging
- Clear docstrings with KRL code examples

## KRL Template Library

Created comprehensive KRL templates demonstrating coordination patterns:

**templates/krl/basic_handshake.src**
- Simple I/O handshaking (KRL signals → Python waits → Python signals back)
- Timeout handling and error recovery
- Complete Python code examples in comments

**templates/krl/parameter_passing.src**
- Bidirectional Tech variable communication
- KRL writes position to Tech.T, Python reads
- Python calculates target, writes to Tech.C, KRL reads
- Demonstrates full parameter exchange workflow

**templates/krl/state_machine.src**
- Multi-state coordination workflow
- States: IDLE, CALIBRATING, READY, EXECUTING, COMPLETE, ERROR
- Combines I/O signals and Tech variables
- Error handling and timeout mechanisms
- Demonstrates complex production-ready pattern

**templates/krl/README.md**
- Comprehensive coordination patterns documentation
- Tech variable mapping conventions (C vs T variables)
- I/O signal mapping standards
- Timing best practices
- Troubleshooting guide

## Python Coordination Examples

Created production-ready Python examples demonstrating all coordination methods:

**examples/coordination/01_basic_handshake.py**
- Simple I/O handshake demonstration
- Matches basic_handshake.src template
- Command-line interface with argparse
- Comprehensive logging and error handling

**examples/coordination/02_parameter_passing.py**
- Parameter exchange workflow
- Reads position from KRL (Tech.T)
- Calculates target position
- Writes target to KRL (Tech.C)
- Matches parameter_passing.src template

**examples/coordination/03_state_machine.py**
- Complex multi-state coordination
- State monitoring loop with enum
- Calibration routine with offset calculation
- Error detection and signaling
- Matches state_machine.src template

**examples/coordination/README.md**
- Complete usage instructions
- Configuration requirements
- Troubleshooting section
- Customization examples
- Advanced usage patterns

## Modified Files

src/RSIPI/io_api.py:
- Added time import
- Implemented set_output() method
- Implemented get_input() method with navigation of receive_variables
- Implemented pulse() method with blocking time.sleep()
- Comprehensive docstrings with examples

src/RSIPI/krl_api.py:
- Added time import
- Implemented wait_for_signal() with configurable polling
- Implemented signal_complete() method
- Implemented write_param() with slot normalization and validation
- Implemented read_param() with slot normalization and validation
- KRL code examples in all docstrings

## New Directories

templates/krl/
- 3 KRL program templates
- Comprehensive README with patterns and conventions

examples/coordination/
- 3 Python example scripts
- Complete usage documentation

## Design Decisions

**I/O Channel Numbering**: 1-based to match KUKA conventions
**Tech Variable Slots**: Validated 11-199 range (KUKA reserves 1-10)
**Blocking Operations**: wait_for_signal() and pulse() block with configurable timeouts
**Error Handling**: Proper exceptions with clear messages
**Logging**: Debug/Info/Warning levels for all operations
**Documentation**: Every method includes KRL code examples

## Phase 3 Status:  COMPLETE

All planned features implemented:
-  High-level Digital I/O API
-  KRL state coordination helpers
-  Parameter passing via Tech variables
-  KRL code templates
-  Python coordination examples
-  Comprehensive documentation

Next: Phase 4 (Advanced Motion Control)
This commit is contained in:
Adam 2026-01-17 00:38:32 +00:00
parent 4ce71a532e
commit 6e0b87b945
10 changed files with 1994 additions and 30 deletions

View File

@ -0,0 +1,113 @@
"""
Basic I/O Handshake Example
Demonstrates simple bidirectional signaling between Python and KRL using
digital I/O channels. Works with templates/krl/basic_handshake.src
Flow:
1. KRL signals "ready" on output 1
2. Python waits for signal
3. Python performs processing
4. Python signals "complete" on input 1
5. KRL continues
Usage:
python 01_basic_handshake.py --config RSI_EthernetConfig.xml
"""
import argparse
import time
import logging
from RSIPI import RSIAPI
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def basic_handshake_example(config_file: str) -> None:
"""
Execute basic I/O handshake with KRL program.
Args:
config_file: Path to RSI configuration XML file
"""
# Initialize API
api = RSIAPI(config_file)
try:
# Start RSI communication
logging.info("Starting RSI communication...")
api.start()
logging.info("✅ RSI started successfully")
# Wait for KRL to signal ready (digital output 1)
logging.info("Waiting for KRL ready signal...")
if api.krl.wait_for_signal(1, timeout=30.0):
logging.info("✅ KRL signaled ready!")
# Simulate Python processing (e.g., data analysis, sensor reading)
logging.info("Performing Python-side processing...")
time.sleep(2.0) # Simulated processing time
# Optional: Do actual work here
# process_sensor_data()
# calculate_corrections()
# update_database()
logging.info("✅ Processing complete")
# Signal completion back to KRL (digital input 1)
api.krl.signal_complete(1)
logging.info("✅ Signaled KRL to continue")
# KRL will now proceed with its motion program
logging.info("KRL is now free to continue motion")
else:
logging.error("❌ Timeout waiting for KRL ready signal")
logging.error("Check that KRL program is running and I/O is configured correctly")
except KeyboardInterrupt:
logging.warning("\n⚠️ Interrupted by user")
except Exception as e:
logging.error(f"❌ Error during handshake: {e}")
finally:
# Clean shutdown
logging.info("Stopping RSI communication...")
api.stop()
logging.info("✅ API stopped successfully")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='Basic I/O Handshake Example')
parser.add_argument(
'--config',
type=str,
default='RSI_EthernetConfig.xml',
help='Path to RSI configuration file'
)
args = parser.parse_args()
logging.info("=" * 60)
logging.info("RSIPI - Basic I/O Handshake Example")
logging.info("=" * 60)
logging.info(f"Config: {args.config}")
logging.info("=" * 60)
basic_handshake_example(args.config)
logging.info("=" * 60)
logging.info("Example complete!")
logging.info("=" * 60)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,133 @@
"""
Parameter Passing Example
Demonstrates bidirectional numerical data exchange between Python and KRL
using RSI Tech variables. Works with templates/krl/parameter_passing.src
Flow:
1. KRL writes current position to Tech.T11-T16
2. Python waits for data ready signal
3. Python reads position from Tech.T
4. Python calculates target and writes to Tech.C11-C13
5. Python signals completion
6. KRL reads target from Tech.C and executes motion
Usage:
python 02_parameter_passing.py --config RSI_EthernetConfig.xml
"""
import argparse
import logging
from RSIPI import RSIAPI
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def parameter_passing_example(config_file: str) -> None:
"""
Execute parameter passing coordination with KRL program.
Reads current position from KRL, calculates target position,
and sends target back to KRL for execution.
Args:
config_file: Path to RSI configuration XML file
"""
api = RSIAPI(config_file)
try:
logging.info("Starting RSI communication...")
api.start()
logging.info("✅ RSI started successfully")
# Wait for KRL to signal that data is ready
logging.info("Waiting for KRL data ready signal...")
if api.krl.wait_for_signal(1, timeout=30.0):
logging.info("✅ KRL signaled data ready!")
# Read current position from Tech.T variables
logging.info("Reading current position from KRL...")
current_x = api.krl.read_param('T11')
current_y = api.krl.read_param('T12')
current_z = api.krl.read_param('T13')
current_a = api.krl.read_param('T14')
current_b = api.krl.read_param('T15')
current_c = api.krl.read_param('T16')
logging.info(f"Current position:")
logging.info(f" X: {current_x:.2f} mm")
logging.info(f" Y: {current_y:.2f} mm")
logging.info(f" Z: {current_z:.2f} mm")
logging.info(f" A: {current_a:.2f}°, B: {current_b:.2f}°, C: {current_c:.2f}°")
# Calculate target position (example: move 100mm in X, 50mm in Y)
logging.info("Calculating target position...")
target_x = current_x + 100.0 # Move 100mm in X
target_y = current_y + 50.0 # Move 50mm in Y
target_z = current_z + 0.0 # Keep Z constant
logging.info(f"Calculated target:")
logging.info(f" X: {target_x:.2f} mm (+100mm)")
logging.info(f" Y: {target_y:.2f} mm (+50mm)")
logging.info(f" Z: {target_z:.2f} mm (no change)")
# Write target position to Tech.C variables for KRL to read
logging.info("Writing target position to KRL...")
api.krl.write_param('C11', target_x)
api.krl.write_param('C12', target_y)
api.krl.write_param('C13', target_z)
logging.info("✅ Target position written to Tech.C")
# Signal KRL that calculation is complete
api.krl.signal_complete(1)
logging.info("✅ Signaled KRL that target is ready")
# KRL will now read target and execute motion
logging.info("KRL will now execute motion to calculated target")
else:
logging.error("❌ Timeout waiting for KRL data ready signal")
except KeyboardInterrupt:
logging.warning("\n⚠️ Interrupted by user")
except Exception as e:
logging.error(f"❌ Error during parameter passing: {e}")
finally:
logging.info("Stopping RSI communication...")
api.stop()
logging.info("✅ API stopped successfully")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='Parameter Passing Example')
parser.add_argument(
'--config',
type=str,
default='RSI_EthernetConfig.xml',
help='Path to RSI configuration file'
)
args = parser.parse_args()
logging.info("=" * 60)
logging.info("RSIPI - Parameter Passing Example")
logging.info("=" * 60)
logging.info(f"Config: {args.config}")
logging.info("=" * 60)
parameter_passing_example(args.config)
logging.info("=" * 60)
logging.info("Example complete!")
logging.info("=" * 60)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,208 @@
"""
State Machine Coordination Example
Demonstrates a multi-state coordination workflow between Python and KRL.
Works with templates/krl/state_machine.src
States:
0: IDLE - Waiting to start
1: CALIBRATING - Python performing calibration
2: READY - Calibration complete, ready for motion
3: EXECUTING - Robot executing motion task
4: COMPLETE - Task finished
9: ERROR - Error condition
Usage:
python 03_state_machine.py --config RSI_EthernetConfig.xml
"""
import argparse
import time
import logging
from enum import IntEnum
from RSIPI import RSIAPI
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class State(IntEnum):
"""State machine states matching KRL program."""
IDLE = 0
CALIBRATING = 1
READY = 2
EXECUTING = 3
COMPLETE = 4
ERROR = 9
def perform_calibration() -> tuple[float, float, float]:
"""
Perform calibration routine.
In a real application, this would:
- Read sensor data
- Analyze calibration target
- Calculate position offsets
- Validate results
Returns:
Tuple of (offset_x, offset_y, offset_z) in mm
"""
logging.info("Performing calibration routine...")
# Simulate calibration processing
time.sleep(2.0)
# Example calibration results (in real app, calculated from sensors)
offset_x = 5.0
offset_y = -2.0
offset_z = 0.5
logging.info(f"Calibration complete:")
logging.info(f" Offset X: {offset_x:+.2f} mm")
logging.info(f" Offset Y: {offset_y:+.2f} mm")
logging.info(f" Offset Z: {offset_z:+.2f} mm")
return (offset_x, offset_y, offset_z)
def state_machine_example(config_file: str) -> None:
"""
Execute state machine coordination with KRL program.
Monitors KRL state transitions and responds appropriately
to each state change.
Args:
config_file: Path to RSI configuration XML file
"""
api = RSIAPI(config_file)
try:
logging.info("Starting RSI communication...")
api.start()
logging.info("✅ RSI started successfully")
# Main state monitoring loop
logging.info("Monitoring state machine...")
last_state = None
calibration_offsets = (0.0, 0.0, 0.0)
while True:
# Read current state from KRL
current_state = int(api.krl.read_param('T11'))
# Only log state changes
if current_state != last_state:
logging.info(f"State changed: {State(last_state or 0).name}{State(current_state).name}")
last_state = current_state
# Handle each state
if current_state == State.IDLE:
# Waiting for KRL to start
logging.debug("Waiting in IDLE state...")
time.sleep(0.5)
elif current_state == State.CALIBRATING:
logging.info("✅ State: CALIBRATING - Starting calibration")
# Perform calibration
calibration_offsets = perform_calibration()
# Write calibration results to Tech.C for KRL
logging.info("Writing calibration offsets to KRL...")
api.krl.write_param('C12', calibration_offsets[0]) # X offset
api.krl.write_param('C13', calibration_offsets[1]) # Y offset
api.krl.write_param('C14', calibration_offsets[2]) # Z offset
# Signal calibration complete
api.krl.signal_complete(1)
logging.info("✅ Calibration complete, signaled KRL")
elif current_state == State.READY:
logging.info("✅ State: READY - System ready for motion")
# KRL is ready to execute, no action needed from Python
time.sleep(0.1)
elif current_state == State.EXECUTING:
logging.info("✅ State: EXECUTING - Robot in motion")
# Monitor execution (could send real-time corrections here)
# Example: Send RSI corrections based on sensor feedback
# api.motion.update_cartesian(X=calibration_offsets[0])
# For this example, just monitor
time.sleep(0.1)
elif current_state == State.COMPLETE:
logging.info("✅ State: COMPLETE - Task finished successfully!")
logging.info("State machine workflow complete")
break # Exit loop, task is done
elif current_state == State.ERROR:
logging.error("❌ State: ERROR - Error detected in KRL program!")
logging.error("Aborting state machine workflow")
break # Exit loop, error occurred
else:
logging.warning(f"⚠️ Unknown state: {current_state}")
time.sleep(0.1)
# Prevent tight loop
time.sleep(0.05) # Check state every 50ms
logging.info("State machine monitoring ended")
except KeyboardInterrupt:
logging.warning("\n⚠️ Interrupted by user")
# Signal error to KRL
try:
api.io.set_output(2, True) # Error signal
logging.info("Signaled error to KRL")
except:
pass
except Exception as e:
logging.error(f"❌ Error during state machine: {e}")
# Signal error to KRL
try:
api.io.set_output(2, True)
except:
pass
finally:
logging.info("Stopping RSI communication...")
api.stop()
logging.info("✅ API stopped successfully")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='State Machine Coordination Example')
parser.add_argument(
'--config',
type=str,
default='RSI_EthernetConfig.xml',
help='Path to RSI configuration file'
)
args = parser.parse_args()
logging.info("=" * 60)
logging.info("RSIPI - State Machine Coordination Example")
logging.info("=" * 60)
logging.info(f"Config: {args.config}")
logging.info("=" * 60)
state_machine_example(args.config)
logging.info("=" * 60)
logging.info("Example complete!")
logging.info("=" * 60)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,345 @@
# Python-KRL Coordination Examples
This directory contains Python examples demonstrating Python-KRL coordination patterns using RSIPI Phase 3 features.
## Prerequisites
- RSIPI library installed (`pip install -e .` from rsi-pi directory)
- KUKA robot controller with RSI 3.3 configured
- RSI_EthernetConfig.xml configured with required I/O and Tech variables
- Corresponding KRL programs uploaded to robot controller (see `templates/krl/`)
## Examples
### 01_basic_handshake.py
**Simple I/O handshaking**
Demonstrates basic bidirectional signaling using digital I/O channels.
**Requires**: `templates/krl/basic_handshake.src` running on robot
**Run**:
```bash
python 01_basic_handshake.py --config path/to/RSI_EthernetConfig.xml
```
**Flow**:
1. Start Python script
2. Execute KRL program on teach pendant
3. Python waits for KRL ready signal
4. Python performs processing
5. Python signals completion
6. KRL continues with motion
**API Features Demonstrated**:
- `api.krl.wait_for_signal(channel, timeout)`
- `api.krl.signal_complete(channel)`
---
### 02_parameter_passing.py
**Bidirectional parameter exchange**
Demonstrates numerical data exchange using RSI Tech variables.
**Requires**: `templates/krl/parameter_passing.src` running on robot
**Run**:
```bash
python 02_parameter_passing.py --config path/to/RSI_EthernetConfig.xml
```
**Flow**:
1. KRL writes current position to Tech.T variables
2. Python reads position data
3. Python calculates target position
4. Python writes target to Tech.C variables
5. Python signals completion
6. KRL reads target and executes motion
**API Features Demonstrated**:
- `api.krl.read_param(slot)` - Read Tech.T variables
- `api.krl.write_param(slot, value)` - Write Tech.C variables
- `api.krl.wait_for_signal(channel, timeout)`
- `api.krl.signal_complete(channel)`
---
### 03_state_machine.py
**Multi-state workflow coordination**
Demonstrates complex state machine with error handling and calibration.
**Requires**: `templates/krl/state_machine.src` running on robot
**Run**:
```bash
python 03_state_machine.py --config path/to/RSI_EthernetConfig.xml
```
**Flow**:
1. Python monitors state variable (Tech.T11)
2. Calibration state: Python performs calibration routine
3. Python writes calibration offsets to Tech.C
4. Executing state: KRL uses offsets for motion
5. Complete state: Workflow finishes successfully
**States**:
- 0: IDLE - Waiting to start
- 1: CALIBRATING - Python calibration in progress
- 2: READY - Ready for motion
- 3: EXECUTING - Robot in motion
- 4: COMPLETE - Task finished
- 9: ERROR - Error condition
**API Features Demonstrated**:
- All coordination methods from examples 01 and 02
- State monitoring loop
- Error handling with I/O signals
- Real-time state transitions
---
## Configuration Requirements
### RSI XML Configuration
Your `RSI_EthernetConfig.xml` must include the following elements:
**Digital I/O:**
```xml
<SEND>
<XML>
<ELEMENT Tag="Digin" Type="INT"/>
</XML>
</SEND>
<RECEIVE>
<XML>
<ELEMENT Tag="Digout" Type="INT"/>
</XML>
</RECEIVE>
```
**Tech Variables:**
```xml
<SEND>
<XML>
<ELEMENT Tag="Tech" Type="DOUBLE" Indizes="[1..199]"/>
</XML>
</SEND>
<RECEIVE>
<XML>
<ELEMENT Tag="Tech" Type="DOUBLE" Indizes="[1..199]"/>
</XML>
</RECEIVE>
```
### Network Settings
Ensure your RSI network settings match:
```xml
<IP_NUMBER>192.168.1.100</IP_NUMBER> <!-- Your PC IP -->
<PORT>49152</PORT>
<SENTYPE>ImFree</SENTYPE>
```
## Running Examples
### Step 1: Start Python Script
```bash
# In one terminal
cd examples/coordination
python 01_basic_handshake.py --config ../../RSI_EthernetConfig.xml
```
### Step 2: Execute KRL Program
1. Upload corresponding KRL program to robot controller
2. Switch to AUTO mode on teach pendant
3. Select and execute the KRL program
4. Monitor coordination in Python terminal
### Step 3: Monitor Output
Python will log:
- State transitions
- I/O signal changes
- Parameter reads/writes
- Errors and warnings
**Example Output**:
```
2026-01-17 14:32:01 - INFO - Starting RSI communication...
2026-01-17 14:32:01 - INFO - ✅ RSI started successfully
2026-01-17 14:32:01 - INFO - Waiting for KRL ready signal...
2026-01-17 14:32:05 - INFO - ✅ KRL signaled ready!
2026-01-17 14:32:05 - INFO - Performing Python-side processing...
2026-01-17 14:32:07 - INFO - ✅ Processing complete
2026-01-17 14:32:07 - INFO - ✅ Signaled KRL to continue
```
## Customizing Examples
### Modify Processing Logic
In `01_basic_handshake.py`:
```python
# Replace simulated processing
time.sleep(2.0)
# With actual processing
sensor_data = read_sensor()
processed_result = analyze_data(sensor_data)
update_database(processed_result)
```
### Change Calculation Logic
In `02_parameter_passing.py`:
```python
# Modify target calculation
target_x = current_x + 100.0 # Original
target_x = calculate_adaptive_target(current_x, sensor_feedback) # Custom
```
### Extend State Machine
In `03_state_machine.py`:
```python
# Add new states
class State(IntEnum):
IDLE = 0
CALIBRATING = 1
INSPECTING = 2 # NEW STATE
READY = 3 # Renumber subsequent states
# ...
# Handle new state
elif current_state == State.INSPECTING:
inspection_result = perform_inspection()
api.krl.write_param('C20', inspection_result)
api.krl.signal_complete(1)
```
## Troubleshooting
### "Timeout waiting for KRL signal"
**Problem**: Python doesn't receive expected I/O signal from KRL
**Solutions**:
1. Verify KRL program is running on robot
2. Check I/O configuration in RSI XML
3. Verify network connectivity (ping robot IP)
4. Check signal mapping ($OUT[1] → Digout.o1)
5. Increase timeout: `api.krl.wait_for_signal(1, timeout=60.0)`
### "RSIVariableError: Tech.T11 not found"
**Problem**: Tech variable not in receive_variables
**Solutions**:
1. Add Tech variables to RSI XML `<RECEIVE>` section
2. Restart robot controller after XML changes
3. Verify variable configuration: `api.tools.show_variables()`
### "Connection refused"
**Problem**: Cannot connect to robot controller
**Solutions**:
1. Check robot IP address in RSI XML
2. Verify robot is in correct mode (T1/T2/AUTO)
3. Ensure firewall allows UDP port 49152
4. Check RSI is enabled on robot controller
### KRL Program Halts
**Problem**: KRL program stops unexpectedly
**Solutions**:
1. Check KRL timeout values (increase if needed)
2. Verify Python script is running before KRL execution
3. Check error signals ($IN[2] for error condition)
4. Review KRL logs on teach pendant
## Advanced Usage
### Non-Blocking Monitoring
For continuous operation without blocking:
```python
import threading
def monitor_state():
while running:
state = api.krl.read_param('T11')
if state == CRITICAL_STATE:
handle_critical_state()
time.sleep(0.1)
# Run monitoring in background thread
monitor_thread = threading.Thread(target=monitor_state, daemon=True)
monitor_thread.start()
# Main thread does other work
perform_other_tasks()
```
### Multiple Coordination Channels
Use different I/O channels for parallel coordination:
```python
# Channel 1: Main workflow
if api.krl.wait_for_signal(1):
api.krl.signal_complete(1)
# Channel 2: Emergency stop
if api.io.get_input(2): # Emergency input
logging.error("Emergency stop!")
api.safety.stop()
# Channel 3: Auxiliary signaling
api.io.pulse(3, duration=0.1) # Quick pulse signal
```
### Integration with Motion Control
Combine coordination with real-time RSI corrections:
```python
# Wait for motion start
api.krl.wait_for_signal(1)
# Send real-time corrections during KRL motion
for i in range(100):
sensor_offset = get_sensor_offset()
api.motion.update_cartesian(X=sensor_offset)
time.sleep(0.004) # 250Hz
# Signal motion complete
api.krl.signal_complete(1)
```
## Next Steps
1. **Test examples** with your robot controller
2. **Adapt templates** to your specific application
3. **Implement error recovery** mechanisms
4. **Create custom workflows** for your use case
5. **Document coordination** protocols for your team
## References
- [RSIPI API Documentation](../../README.md)
- [KRL Templates](../../templates/krl/README.md)
- [Phase 3 Summary](../../PHASE_3_SUMMARY.md) (when available)
- [KUKA RSI 3.3 Manual](https://www.kuka.com)
---
**Last Updated**: January 17, 2026
**RSIPI Version**: 2.0.0

View File

@ -1,7 +1,8 @@
"""Digital I/O API namespace for RSIPI."""
import logging
from typing import Union, TYPE_CHECKING
import time
from typing import Union, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from .rsi_client import RSIClient
@ -61,15 +62,143 @@ class IOAPI:
logging.debug(f"I/O {var_name} set to {state_value}")
return result
# TODO (Phase 3): Implement high-level I/O helpers
# def set_output(self, channel: int, value: bool) -> str:
# """Set digital output by channel number."""
# pass
#
# def get_input(self, channel: int) -> bool:
# """Read digital input by channel number."""
# pass
#
# def pulse(self, channel: int, duration: float = 0.1) -> None:
# """Generate a pulse on the specified output channel."""
# pass
def set_output(self, channel: int, value: bool, group: str = 'Digout') -> str:
"""
Set digital output by channel number.
High-level wrapper for setting digital outputs. More convenient than
toggle() when working with standard digital output channels.
Args:
channel: Output channel number (1-based, e.g., 1 for o1)
value: Desired state (True = ON, False = OFF)
group: I/O group name (default: 'Digout')
Returns:
Status message indicating success
Raises:
RSIVariableError: If the output channel doesn't exist
RSISafetyViolation: If safety checks prevent the operation
Example:
>>> api.io.set_output(1, True) # Turn ON output 1
'Updated Digout.o1 to 1'
>>> api.io.set_output(3, False) # Turn OFF output 3
'Updated Digout.o3 to 0'
>>> api.io.set_output(5, True, group='DiO') # Custom group
'Updated DiO.5 to 1'
Note:
The default group 'Digout' corresponds to standard KUKA digital
outputs configured in RSI. Channel numbering starts at 1 to match
KUKA controller conventions.
"""
channel_name = f"o{channel}"
return self.toggle(group, channel_name, value)
def get_input(self, channel: int, group: str = 'Digin') -> bool:
"""
Read digital input by channel number.
High-level wrapper for reading digital input states from the robot
controller. Returns current state as boolean.
Args:
channel: Input channel number (1-based, e.g., 1 for i1)
group: I/O group name (default: 'Digin')
Returns:
True if input is HIGH/ON, False if LOW/OFF
Raises:
RSIVariableError: If the input channel doesn't exist in receive_variables
Example:
>>> # Check if input 1 is active
>>> if api.io.get_input(1):
... print("Sensor triggered!")
Sensor triggered!
>>> # Read from custom group
>>> state = api.io.get_input(5, group='DiI')
>>> print(f"Input 5 state: {state}")
Input 5 state: True
Note:
This reads from receive_variables, which contains the robot
controller's current I/O state. Values are updated every RSI
cycle (~4ms).
"""
from .exceptions import RSIVariableError
channel_name = f"i{channel}"
var_name = f"{group}.{channel_name}"
# Navigate nested receive_variables structure
if group in self.client.receive_variables:
group_dict = self.client.receive_variables.get(group, {})
if isinstance(group_dict, dict) and channel_name in group_dict:
value = group_dict[channel_name]
return bool(value)
else:
raise RSIVariableError(f"Input channel '{channel_name}' not found in group '{group}'")
else:
raise RSIVariableError(f"Input group '{group}' not found in receive_variables")
def pulse(self, channel: int, duration: float = 0.1, group: str = 'Digout') -> str:
"""
Generate a timed pulse on the specified output channel.
Turns the output ON, waits for the specified duration, then turns it OFF.
Useful for triggering pneumatic actuators, solenoids, or signaling events.
Args:
channel: Output channel number (1-based)
duration: Pulse duration in seconds (default: 0.1 = 100ms)
group: I/O group name (default: 'Digout')
Returns:
Status message indicating completion
Raises:
RSIVariableError: If the output channel doesn't exist
RSISafetyViolation: If safety checks prevent the operation
Example:
>>> # 100ms pulse on output 2
>>> api.io.pulse(2)
'Pulse completed on Digout.o2 (duration: 0.1s)'
>>> # 500ms pulse on output 5
>>> api.io.pulse(5, duration=0.5)
'Pulse completed on Digout.o5 (duration: 0.5s)'
>>> # Trigger pneumatic gripper on custom channel
>>> api.io.pulse(3, duration=0.2, group='DiO')
'Pulse completed on DiO.o3 (duration: 0.2s)'
Warning:
This method blocks for the duration of the pulse. For non-blocking
pulses, consider using threading or async I/O patterns.
Note:
Pulse timing accuracy depends on system load and RSI cycle time.
For critical timing requirements, consider hardware-timed outputs
or KRL-based pulse generation.
"""
channel_name = f"o{channel}"
var_name = f"{group}.{channel_name}"
# Turn ON
self.set_output(channel, True, group=group)
logging.debug(f"Pulse started on {var_name}")
# Wait for duration
time.sleep(duration)
# Turn OFF
self.set_output(channel, False, group=group)
logging.info(f"Pulse completed on {var_name} (duration: {duration}s)")
return f"Pulse completed on {var_name} (duration: {duration}s)"

View File

@ -1,7 +1,8 @@
"""KRL program manipulation API namespace for RSIPI."""
import logging
from typing import Optional, TYPE_CHECKING
import time
from typing import Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from .rsi_client import RSIClient
@ -115,19 +116,267 @@ class KRLAPI:
logging.error(f"RSI injection failed: {e}")
return f"RSI injection failed: {e}"
# TODO (Phase 3): Implement KRL coordination helpers
# def wait_for_signal(self, channel: int, timeout: float = 5.0) -> bool:
# """Wait for KRL to set a specific I/O signal."""
# pass
#
# def signal_complete(self, channel: int) -> None:
# """Signal to KRL that Python-side operation is complete."""
# pass
#
# def write_param(self, slot: str, value: float) -> str:
# """Write parameter to Tech.C variable for KRL to read."""
# pass
#
# def read_param(self, slot: str) -> float:
# """Read parameter from Tech.T variable written by KRL."""
# pass
def wait_for_signal(
self,
channel: int,
timeout: float = 5.0,
check_interval: float = 0.01,
group: str = 'Digin'
) -> bool:
"""
Wait for KRL to set a specific I/O signal.
Blocks until the specified digital input becomes HIGH, or timeout occurs.
Commonly used for synchronization where Python waits for KRL to signal
completion of a robot operation before proceeding.
Args:
channel: Input channel number to monitor (1-based)
timeout: Maximum wait time in seconds (default: 5.0)
check_interval: Polling interval in seconds (default: 0.01 = 10ms)
group: I/O group name (default: 'Digin')
Returns:
True if signal received, False if timeout occurred
Example:
>>> # Wait for KRL to signal ready on input 3
>>> if api.krl.wait_for_signal(3, timeout=10.0):
... print("KRL signaled ready!")
... # Proceed with Python-side processing
... else:
... print("Timeout waiting for KRL signal")
>>> # Handshake pattern: Python waits → KRL signals → Python continues
>>> api.motion.update_cartesian(X=100) # Send correction
>>> api.krl.wait_for_signal(1) # Wait for KRL to acknowledge
>>> # KRL has processed the correction, safe to continue
Note:
This is a blocking operation. The check_interval determines polling
frequency - lower values provide faster response but higher CPU usage.
For typical RSI applications, 10-50ms intervals are appropriate.
Warning:
Ensure the KRL program actually sets the signal, otherwise this will
block until timeout. Consider using try/except for timeout handling.
"""
from .io_api import IOAPI
io_api = IOAPI(self.client)
start_time = time.time()
logging.debug(f"Waiting for signal on {group}.i{channel} (timeout: {timeout}s)")
while (time.time() - start_time) < timeout:
if io_api.get_input(channel, group=group):
elapsed = time.time() - start_time
logging.info(f"Signal received on {group}.i{channel} after {elapsed:.3f}s")
return True
time.sleep(check_interval)
logging.warning(f"Timeout waiting for signal on {group}.i{channel} after {timeout}s")
return False
def signal_complete(self, channel: int, group: str = 'Digout') -> str:
"""
Signal to KRL that Python-side operation is complete.
Sets the specified digital output HIGH to notify the KRL program that
Python has finished processing and KRL can proceed.
Args:
channel: Output channel number to signal on (1-based)
group: I/O group name (default: 'Digout')
Returns:
Status message indicating success
Raises:
RSIVariableError: If the output channel doesn't exist
RSISafetyViolation: If safety checks prevent the operation
Example:
>>> # Signal KRL that data processing is complete
>>> api.krl.signal_complete(2)
'Signaled complete on Digout.o2'
>>> # Typical coordination pattern:
>>> # 1. KRL sends data via Tech variables
>>> # 2. Python processes data
>>> result = api.krl.read_param('T11') # Read from KRL
>>> processed = result * 2.0 # Process
>>> api.krl.write_param('C11', processed) # Write result
>>> api.krl.signal_complete(1) # Tell KRL we're done
Note:
This sets the output and leaves it HIGH. If you need to reset the
signal after KRL acknowledges, use api.io.pulse() instead or manually
call api.io.set_output(channel, False) after KRL reads the signal.
See Also:
wait_for_signal() - Complementary method for waiting on inputs
api.io.pulse() - For temporary signal pulses
"""
from .io_api import IOAPI
io_api = IOAPI(self.client)
io_api.set_output(channel, True, group=group)
logging.info(f"Signaled complete on {group}.o{channel}")
return f"Signaled complete on {group}.o{channel}"
def write_param(self, slot: Union[int, str], value: float) -> str:
"""
Write parameter to Tech.C variable for KRL to read.
Tech.C variables (C11-C199) are "Control" parameters written by Python
and read by KRL programs. Used for passing numerical data from Python
to the robot controller.
Args:
slot: Tech.C slot number (11-199) or string like 'C11', 'c15'
value: Numerical value to write
Returns:
Status message indicating success
Raises:
ValueError: If slot number is invalid (must be 11-199)
RSIVariableError: If Tech variable group doesn't exist
RSISafetyViolation: If safety checks prevent the operation
Example:
>>> # Send target position to KRL
>>> api.krl.write_param(11, 650.5) # Tech.C11 = 650.5
'Updated Tech.C11 to 650.5'
>>> # Send multiple parameters
>>> api.krl.write_param('C12', 120.0) # X coordinate
>>> api.krl.write_param('C13', -50.0) # Y coordinate
>>> api.krl.write_param('C14', 800.0) # Z coordinate
>>> # KRL side reads with: target_x = $TECH.C[12]
Note:
KUKA RSI Tech variables support slots 11-199. Slots 1-10 are reserved.
The KRL program must read from $TECH.C[n] to access these values.
KRL Example:
```krl
DEF my_program()
DECL REAL target_x, target_y, target_z
; Python writes to C12, C13, C14
target_x = $TECH.C[12]
target_y = $TECH.C[13]
target_z = $TECH.C[14]
; Use coordinates...
END
```
See Also:
read_param() - Read Tech.T variables written by KRL
"""
# Normalize slot to integer
if isinstance(slot, str):
slot_str = slot.upper().strip()
if slot_str.startswith('C'):
slot_num = int(slot_str[1:])
else:
slot_num = int(slot_str)
else:
slot_num = int(slot)
# Validate slot range (KUKA reserves 1-10, usable range is 11-199)
if not (11 <= slot_num <= 199):
raise ValueError(f"Tech slot must be between 11-199, got {slot_num}")
from .tools_api import ToolsAPI
tools = ToolsAPI(self.client)
var_name = f"Tech.C{slot_num}"
result = tools.update_variable(var_name, value)
logging.debug(f"Wrote {value} to {var_name}")
return result
def read_param(self, slot: Union[int, str]) -> float:
"""
Read parameter from Tech.T variable written by KRL.
Tech.T variables (T11-T199) are "Transfer" parameters written by KRL
programs and read by Python. Used for passing numerical data from the
robot controller to Python.
Args:
slot: Tech.T slot number (11-199) or string like 'T11', 't15'
Returns:
Numerical value from the Tech.T slot
Raises:
ValueError: If slot number is invalid (must be 11-199)
RSIVariableError: If Tech variable group doesn't exist or slot not found
Example:
>>> # Read sensor value from KRL
>>> force = api.krl.read_param(11) # Read Tech.T11
>>> print(f"Force reading: {force}")
Force reading: 125.5
>>> # Read multiple parameters
>>> actual_x = api.krl.read_param('T12')
>>> actual_y = api.krl.read_param('T13')
>>> actual_z = api.krl.read_param('T14')
>>> # KRL side writes with: $TECH.T[12] = actual_pos.X
Note:
Tech.T variables are updated every RSI cycle (~4ms) from the robot
controller. Values reflect the KRL program's last write operation.
KRL Example:
```krl
DEF my_program()
DECL E6POS actual_pos
actual_pos = $POS_ACT
; Write to Tech.T for Python to read
$TECH.T[12] = actual_pos.X
$TECH.T[13] = actual_pos.Y
$TECH.T[14] = actual_pos.Z
END
```
Warning:
Ensure the KRL program has written to the Tech.T slot before reading,
otherwise you'll receive the default value (typically 0.0).
See Also:
write_param() - Write Tech.C variables for KRL to read
"""
from .exceptions import RSIVariableError
# Normalize slot to integer
if isinstance(slot, str):
slot_str = slot.upper().strip()
if slot_str.startswith('T'):
slot_num = int(slot_str[1:])
else:
slot_num = int(slot_str)
else:
slot_num = int(slot)
# Validate slot range
if not (11 <= slot_num <= 199):
raise ValueError(f"Tech slot must be between 11-199, got {slot_num}")
# Read from receive_variables
if 'Tech' in self.client.receive_variables:
tech_dict = self.client.receive_variables.get('Tech', {})
var_name = f"T{slot_num}"
if isinstance(tech_dict, dict) and var_name in tech_dict:
value = tech_dict[var_name]
logging.debug(f"Read {value} from Tech.{var_name}")
return float(value)
else:
raise RSIVariableError(f"Tech.{var_name} not found in receive_variables")
else:
raise RSIVariableError("Tech variable group not found in receive_variables")

393
templates/krl/README.md Normal file
View File

@ -0,0 +1,393 @@
# KRL Coordination Templates
This directory contains KRL program templates demonstrating common Python-KRL coordination patterns using RSIPI.
## Available Templates
### 1. basic_handshake.src
**Simple I/O handshaking between Python and KRL**
- KRL signals "ready" via digital output
- Python waits for signal, processes data
- Python signals "complete" via input
- KRL waits for completion, then continues
**Use Case**: Basic synchronization, ensuring Python completes processing before KRL continues.
**Coordination Methods Used**:
- `api.krl.wait_for_signal(channel, timeout)`
- `api.krl.signal_complete(channel)`
### 2. parameter_passing.src
**Bidirectional numerical data exchange via Tech variables**
- KRL writes current position to Tech.T variables
- Python reads position data
- Python calculates target and writes to Tech.C variables
- KRL reads target and executes motion
**Use Case**: Passing numerical parameters (positions, forces, tolerances) between Python and KRL.
**Coordination Methods Used**:
- `api.krl.read_param(slot)` - Read from Tech.T
- `api.krl.write_param(slot, value)` - Write to Tech.C
- `api.krl.wait_for_signal(channel, timeout)`
- `api.krl.signal_complete(channel)`
### 3. state_machine.src
**Multi-state workflow with complex coordination**
Implements a 5-state machine:
1. **IDLE**: Waiting to start
2. **CALIBRATING**: Python performing calibration
3. **READY**: Calibration complete
4. **EXECUTING**: Robot motion in progress
5. **COMPLETE**: Task finished
6. **ERROR**: Error handling state
**Use Case**: Complex workflows requiring multiple handshakes, error handling, and state tracking.
**Coordination Methods Used**:
- All coordination methods from basic_handshake and parameter_passing
- State variable in Tech.T[11]
- Command variable in Tech.C[11]
## Python-KRL Coordination Patterns
### Pattern 1: Simple Handshake
```python
# Python side
api.krl.wait_for_signal(1) # Wait for KRL ready signal
# Do processing...
api.krl.signal_complete(1) # Signal KRL to continue
```
```krl
; KRL side
$OUT[1] = TRUE ; Signal ready to Python
; Wait for Python completion
WHILE $IN[1] == FALSE
WAIT SEC 0.1
ENDWHILE
```
### Pattern 2: Parameter Exchange
```python
# Python side
api.krl.wait_for_signal(1)
# Read from KRL
value = api.krl.read_param('T11')
# Process and write back
result = process(value)
api.krl.write_param('C11', result)
api.krl.signal_complete(1)
```
```krl
; KRL side
$TECH.T[11] = some_value
$OUT[1] = TRUE ; Signal data ready
; Wait for Python
WHILE $IN[1] == FALSE
WAIT SEC 0.1
ENDWHILE
; Read result
result = $TECH.C[11]
```
### Pattern 3: Continuous Monitoring
```python
# Python side - non-blocking monitoring loop
api.start()
while api.is_running():
state = api.krl.read_param('T11')
if state == 1: # Specific state
# React to state change
api.krl.write_param('C11', calculated_value)
api.krl.signal_complete(1)
time.sleep(0.1) # Check every 100ms
api.stop()
```
```krl
; KRL side - updates state continuously
$TECH.T[11] = current_state
; Wait for Python response when needed
WHILE $IN[1] == FALSE
WAIT SEC 0.1
ENDWHILE
calculated = $TECH.C[11]
```
## Tech Variable Conventions
### Tech.C Variables (Python → KRL)
**"Control" variables - Python writes, KRL reads**
| Slot | Description | Example Usage |
|------|-------------|---------------|
| C11 | Command/state | 0=continue, 1=pause, 2=abort |
| C12-C14 | Position offsets | X, Y, Z corrections |
| C15-C17 | Target position | Calculated target coordinates |
| C18-C20 | Process parameters | Speed, force, tolerance |
| C21+ | Custom parameters | Application-specific data |
```python
# Python writes
api.krl.write_param('C11', 0) # Command: continue
api.krl.write_param('C12', 5.0) # X offset
api.krl.write_param('C13', -2.0) # Y offset
```
```krl
; KRL reads
command = $TECH.C[11]
offset_x = $TECH.C[12]
offset_y = $TECH.C[13]
```
### Tech.T Variables (KRL → Python)
**"Transfer" variables - KRL writes, Python reads**
| Slot | Description | Example Usage |
|------|-------------|---------------|
| T11 | Current state | State machine state number |
| T12-T14 | Current position | X, Y, Z coordinates |
| T15-T17 | Force/torque | Measured forces |
| T18-T20 | Sensor readings | External sensor data |
| T21+ | Custom data | Application-specific values |
```krl
; KRL writes
$TECH.T[11] = current_state
$TECH.T[12] = $POS_ACT.X
$TECH.T[13] = $POS_ACT.Y
$TECH.T[14] = $POS_ACT.Z
```
```python
# Python reads
state = api.krl.read_param('T11')
pos_x = api.krl.read_param('T12')
pos_y = api.krl.read_param('T13')
pos_z = api.krl.read_param('T14')
```
## I/O Signal Conventions
### Standard I/O Mapping
| Signal | Type | Purpose |
|--------|------|---------|
| $OUT[1] | Output | KRL → Python state/ready signal |
| $IN[1] | Input | Python → KRL completion acknowledgement |
| $IN[2] | Input | Python → KRL error signal |
| $OUT[2] | Output | KRL → Python auxiliary signal |
```python
# Python I/O methods
api.krl.wait_for_signal(1) # Wait for $OUT[1]
api.krl.signal_complete(1) # Set $IN[1]
api.io.set_output(2, True) # Control $OUT[2]
```
## Error Handling Best Practices
### Timeouts
**Always use timeouts to prevent indefinite blocking:**
```python
# Python
if not api.krl.wait_for_signal(1, timeout=10.0):
print("Timeout waiting for KRL!")
# Handle error
```
```krl
; KRL
INT counter
counter = 0
WHILE ($IN[1] == FALSE) AND (counter < 100)
WAIT SEC 0.1
counter = counter + 1
ENDWHILE
IF counter >= 100 THEN
; Timeout - handle error
HALT
ENDIF
```
### Error Signaling
**Use dedicated error channels:**
```python
# Python detects error
if error_condition:
api.io.set_output(2, True) # Signal error to KRL
```
```krl
; KRL checks for errors
IF $IN[2] == TRUE THEN
; Python signaled error
HALT
ENDIF
```
## Integration with RSI Motion Control
All coordination patterns work seamlessly with RSI real-time motion corrections:
```python
# Python coordinates with KRL AND sends real-time corrections
api.start()
# Wait for KRL to start motion phase
api.krl.wait_for_signal(1)
# Send real-time corrections during KRL motion
for i in range(100):
correction = calculate_correction()
api.motion.update_cartesian(X=correction)
time.sleep(0.004) # 250Hz update rate
# Signal motion phase complete
api.krl.signal_complete(1)
api.stop()
```
```krl
; KRL executes motion while Python sends corrections
$OUT[1] = TRUE ; Signal motion start
; Python sends corrections via RSI during this move
LIN target_pos Vel=0.5 m/s CPDAT1 Tool[1] Base[0]
; Wait for Python to finish corrections
WHILE $IN[1] == FALSE
WAIT SEC 0.1
ENDWHILE
```
## Testing Templates
To test these templates:
1. **Upload KRL program to robot controller**
2. **Start Python coordination script**
3. **Execute KRL program on teach pendant**
4. **Monitor coordination in Python logs**
Example Python test script:
```python
from RSIPI import RSIAPI
import time
api = RSIAPI('RSI_EthernetConfig.xml')
api.start()
try:
print("Waiting for KRL ready signal...")
if api.krl.wait_for_signal(1, timeout=30.0):
print("✅ KRL signaled ready!")
# Simulate processing
time.sleep(2.0)
print("Processing complete")
# Signal back to KRL
api.krl.signal_complete(1)
print("✅ Signaled KRL to continue")
else:
print("❌ Timeout waiting for KRL")
except KeyboardInterrupt:
print("\n⚠ Interrupted by user")
finally:
api.stop()
print("API stopped")
```
## Troubleshooting
### Signal Not Received
**Check I/O configuration in RSI XML:**
```xml
<SEND>
<XML>
<ELEMENT Tag="Digin" Type="INT"/>
</XML>
</SEND>
<RECEIVE>
<XML>
<ELEMENT Tag="Digout" Type="INT"/>
</XML>
</RECEIVE>
```
### Tech Variable Not Found
**Ensure Tech variables are configured in RSI XML:**
```xml
<SEND>
<XML>
<ELEMENT Tag="Tech" Type="DOUBLE" Indizes="[1..199]"/>
</XML>
</SEND>
<RECEIVE>
<XML>
<ELEMENT Tag="Tech" Type="DOUBLE" Indizes="[1..199]"/>
</XML>
</RECEIVE>
```
### Timing Issues
- **Reduce check_interval for faster response**: `api.krl.wait_for_signal(1, check_interval=0.005)`
- **Increase timeout for slow operations**: `api.krl.wait_for_signal(1, timeout=60.0)`
- **Add WAIT SEC delays in KRL for signal propagation**
## Next Steps
After understanding these templates:
1. **Adapt templates** to your specific application
2. **Test coordination** patterns with your robot
3. **Implement error recovery** mechanisms
4. **Document custom** coordination protocols
5. **Create application-specific** state machines
## References
- [RSIPI Documentation](../../README.md)
- [Phase 3 Summary](../../PHASE_3_SUMMARY.md) (when available)
- [KUKA RSI 3.3 Documentation](https://www.kuka.com)
---
**Template Author**: RSIPI Development Team
**Last Updated**: January 17, 2026
**Version**: 1.0

View File

@ -0,0 +1,80 @@
&ACCESS RVP
&REL 1
&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe
&PARAM EDITMASK = *
DEF basic_handshake()
; ================================================================
; Basic I/O Handshake Pattern
; ================================================================
; Demonstrates simple bidirectional signaling between KRL and Python
; via digital I/O channels.
;
; Python-KRL Coordination Flow:
; 1. KRL signals "ready" to Python (output 1)
; 2. Python waits for signal, then processes
; 3. Python signals "complete" to KRL (input 1)
; 4. KRL waits for completion signal, then continues
;
; Python Code Example:
; api = RSIAPI('RSI_EthernetConfig.xml')
; api.start()
;
; # Wait for KRL to signal ready
; if api.krl.wait_for_signal(1, timeout=10.0):
; print("KRL is ready!")
;
; # Do Python processing here
; time.sleep(1.0)
;
; # Signal completion back to KRL
; api.krl.signal_complete(1)
;
; api.stop()
; ================================================================
; Variable declarations
INT i
BOOL python_ready
BAS(#INITMOV, 0) ; Initialize motion
; ============================================================
; STEP 1: KRL signals ready to Python
; ============================================================
$OUT[1] = TRUE ; Signal ready on digital output 1
WAIT SEC 0.1 ; Brief delay for signal propagation
; ============================================================
; STEP 2: Wait for Python to acknowledge completion
; ============================================================
python_ready = FALSE
i = 0
; Poll input 1 until Python signals completion (max 10 seconds)
WHILE (python_ready == FALSE) AND (i < 100)
IF $IN[1] == TRUE THEN
python_ready = TRUE
ELSE
WAIT SEC 0.1
i = i + 1
ENDIF
ENDWHILE
IF python_ready == TRUE THEN
; Python signaled completion successfully
; Safe to proceed with robot motion
; Reset handshake signals
$OUT[1] = FALSE
; Example motion with Python coordination
PTP HOME Vel=100 % DEFAULT
; Python can send corrections during motion via RSI
ELSE
; Timeout - Python did not respond
; Halt program for safety
HALT
ENDIF
END

View File

@ -0,0 +1,125 @@
&ACCESS RVP
&REL 1
&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe
&PARAM EDITMASK = *
DEF parameter_passing()
; ================================================================
; Parameter Passing via Tech Variables
; ================================================================
; Demonstrates bidirectional numerical data exchange between KRL
; and Python using RSI Tech variables.
;
; Tech Variable Conventions:
; - Tech.C[11-199]: Python writes, KRL reads (Control variables)
; - Tech.T[11-199]: KRL writes, Python reads (Transfer variables)
;
; Python-KRL Coordination Flow:
; 1. KRL writes current position to Tech.T
; 2. Python reads position from Tech.T
; 3. Python processes data and writes target to Tech.C
; 4. KRL reads target from Tech.C and executes motion
;
; Python Code Example:
; api = RSIAPI('RSI_EthernetConfig.xml')
; api.start()
;
; # Wait for KRL to signal data ready
; api.krl.wait_for_signal(1)
;
; # Read current position from KRL
; current_x = api.krl.read_param('T11')
; current_y = api.krl.read_param('T12')
; current_z = api.krl.read_param('T13')
;
; # Calculate target (e.g., move 100mm in X)
; target_x = current_x + 100.0
; target_y = current_y
; target_z = current_z
;
; # Write target back to KRL
; api.krl.write_param('C11', target_x)
; api.krl.write_param('C12', target_y)
; api.krl.write_param('C13', target_z)
;
; # Signal completion
; api.krl.signal_complete(1)
;
; api.stop()
; ================================================================
; Variable declarations
E6POS current_pos, target_pos
REAL target_x, target_y, target_z
INT i
BOOL data_ready
BAS(#INITMOV, 0)
; ============================================================
; STEP 1: KRL writes current position to Tech.T
; ============================================================
current_pos = $POS_ACT ; Get current TCP position
; Write position components to Transfer variables (Python reads)
$TECH.T[11] = current_pos.X
$TECH.T[12] = current_pos.Y
$TECH.T[13] = current_pos.Z
$TECH.T[14] = current_pos.A
$TECH.T[15] = current_pos.B
$TECH.T[16] = current_pos.C
; ============================================================
; STEP 2: Signal Python that data is ready
; ============================================================
$OUT[1] = TRUE
WAIT SEC 0.1
; ============================================================
; STEP 3: Wait for Python to process and write target
; ============================================================
data_ready = FALSE
i = 0
; Poll input 1 for Python completion signal
WHILE (data_ready == FALSE) AND (i < 100)
IF $IN[1] == TRUE THEN
data_ready = TRUE
ELSE
WAIT SEC 0.1
i = i + 1
ENDIF
ENDWHILE
IF data_ready == TRUE THEN
; ============================================================
; STEP 4: Read target position from Tech.C
; ============================================================
; Python has written to Control variables (KRL reads)
target_x = $TECH.C[11]
target_y = $TECH.C[12]
target_z = $TECH.C[13]
; Construct target pose
target_pos = current_pos ; Copy orientation
target_pos.X = target_x
target_pos.Y = target_y
target_pos.Z = target_z
; ============================================================
; STEP 5: Execute motion to target
; ============================================================
; Reset signals
$OUT[1] = FALSE
; Move to calculated target
LIN target_pos Vel=0.5 m/s CPDAT1 Tool[1] Base[0]
; Success indication
WAIT SEC 0.5
ELSE
; Timeout - halt for safety
HALT
ENDIF
END

View File

@ -0,0 +1,189 @@
&ACCESS RVP
&REL 1
&PARAM TEMPLATE = C:\KRC\Roboter\Template\vorgabe
&PARAM EDITMASK = *
DEF state_machine()
; ================================================================
; Multi-State Coordination Pattern
; ================================================================
; Demonstrates a state machine for complex Python-KRL workflows.
; Uses both I/O signals and Tech variables for robust coordination.
;
; State Definitions:
; State 0: IDLE - Waiting to start
; State 1: CALIBRATING - Python performing calibration
; State 2: READY - Calibration complete, ready for motion
; State 3: EXECUTING - Robot executing motion task
; State 4: COMPLETE - Task finished
; State 9: ERROR - Error condition
;
; I/O Mapping:
; $OUT[1]: State signal to Python (pulse indicates state change)
; $IN[1]: Python acknowledgement
; $IN[2]: Python error signal
;
; Tech Variable Mapping:
; Tech.T[11]: Current state (KRL → Python)
; Tech.C[11]: Python command (0=continue, 1=pause, 2=abort)
; Tech.C[12-14]: Calibration offset (X, Y, Z)
;
; Python Code Example:
; api = RSIAPI('RSI_EthernetConfig.xml')
; api.start()
;
; # Monitor state transitions
; while True:
; state = api.krl.read_param('T11')
;
; if state == 1: # CALIBRATING
; print("Performing calibration...")
; # Calibration logic here
; offset_x = 5.0
; offset_y = -2.0
; offset_z = 0.5
;
; # Write calibration results
; api.krl.write_param('C12', offset_x)
; api.krl.write_param('C13', offset_y)
; api.krl.write_param('C14', offset_z)
;
; # Acknowledge state completion
; api.krl.signal_complete(1)
;
; elif state == 3: # EXECUTING
; print("Robot executing motion...")
; # Monitor execution, send corrections if needed
; api.motion.update_cartesian(X=offset_x, Y=offset_y)
;
; elif state == 4: # COMPLETE
; print("Task complete!")
; break
;
; elif state == 9: # ERROR
; print("Error detected, aborting!")
; break
;
; time.sleep(0.1)
;
; api.stop()
; ================================================================
; Variable declarations
INT current_state, python_cmd
REAL offset_x, offset_y, offset_z
E6POS target_pos
BOOL ack_received
INT timeout_counter
BAS(#INITMOV, 0)
; Initialize state machine
current_state = 0 ; IDLE
$TECH.T[11] = current_state
; ============================================================
; STATE 0: IDLE
; ============================================================
current_state = 0
$TECH.T[11] = current_state
WAIT SEC 1.0
; ============================================================
; STATE 1: CALIBRATING
; ============================================================
current_state = 1
$TECH.T[11] = current_state
; Signal Python to start calibration
$OUT[1] = TRUE
WAIT SEC 0.1
$OUT[1] = FALSE
; Wait for Python calibration completion
ack_received = FALSE
timeout_counter = 0
WHILE (ack_received == FALSE) AND (timeout_counter < 200)
; Check for error signal
IF $IN[2] == TRUE THEN
current_state = 9 ; ERROR
$TECH.T[11] = current_state
HALT
ENDIF
; Check for completion
IF $IN[1] == TRUE THEN
ack_received = TRUE
ELSE
WAIT SEC 0.1
timeout_counter = timeout_counter + 1
ENDIF
ENDWHILE
IF ack_received == FALSE THEN
; Timeout error
current_state = 9
$TECH.T[11] = current_state
HALT
ENDIF
; ============================================================
; STATE 2: READY
; ============================================================
; Read calibration offsets from Python
offset_x = $TECH.C[12]
offset_y = $TECH.C[13]
offset_z = $TECH.C[14]
current_state = 2
$TECH.T[11] = current_state
; Signal ready
$OUT[1] = TRUE
WAIT SEC 0.1
$OUT[1] = FALSE
WAIT SEC 0.5
; ============================================================
; STATE 3: EXECUTING
; ============================================================
current_state = 3
$TECH.T[11] = current_state
; Calculate target with calibration offset
target_pos = $POS_ACT
target_pos.X = target_pos.X + offset_x
target_pos.Y = target_pos.Y + offset_y
target_pos.Z = target_pos.Z + offset_z
; Execute motion (Python can send RSI corrections during this)
LIN target_pos Vel=0.3 m/s CPDAT1 Tool[1] Base[0]
; Check for pause/abort commands
python_cmd = $TECH.C[11]
IF python_cmd == 2 THEN
; Abort requested
current_state = 9
$TECH.T[11] = current_state
HALT
ENDIF
; ============================================================
; STATE 4: COMPLETE
; ============================================================
current_state = 4
$TECH.T[11] = current_state
; Signal completion
$OUT[1] = TRUE
WAIT SEC 0.2
$OUT[1] = FALSE
; Return to home
PTP HOME Vel=100 % DEFAULT
; Final state update
current_state = 0 ; Back to IDLE
$TECH.T[11] = current_state
END