RSI-PI/examples/coordination/02_parameter_passing.py
Adam 6e0b87b945 Implement Phase 3: KRL Coordination
Complete implementation of Python-KRL coordination features for seamless
bidirectional communication between RSIPI and KUKA KRL programs.

## IOAPI Enhancements

Added high-level I/O control methods for convenient digital I/O manipulation:

- **set_output(channel, value, group='Digout')** - Set digital output by channel number
- **get_input(channel, group='Digin')** - Read digital input by channel number
- **pulse(channel, duration=0.1, group='Digout')** - Generate timed pulse on output

Benefits:
- Simpler channel-based addressing (channel 1 instead of 'Digout.o1')
- Automatic channel name formatting
- Built-in pulse generation for pneumatic actuators and signaling
- Consistent error handling

## KRLAPI Enhancements

Added coordination helper methods for Python-KRL synchronization:

- **wait_for_signal(channel, timeout=5.0)** - Block until KRL sets I/O signal
- **signal_complete(channel)** - Signal KRL that Python operation is complete
- **write_param(slot, value)** - Write to Tech.C variables (Python → KRL)
- **read_param(slot)** - Read from Tech.T variables (KRL → Python)

Features:
- Configurable timeouts with proper error handling
- Flexible slot addressing (11, 'C11', 'c11' all work)
- Slot validation (enforces 11-199 range)
- Comprehensive logging for debugging
- Clear docstrings with KRL code examples

## KRL Template Library

Created comprehensive KRL templates demonstrating coordination patterns:

**templates/krl/basic_handshake.src**
- Simple I/O handshaking (KRL signals → Python waits → Python signals back)
- Timeout handling and error recovery
- Complete Python code examples in comments

**templates/krl/parameter_passing.src**
- Bidirectional Tech variable communication
- KRL writes position to Tech.T, Python reads
- Python calculates target, writes to Tech.C, KRL reads
- Demonstrates full parameter exchange workflow

**templates/krl/state_machine.src**
- Multi-state coordination workflow
- States: IDLE, CALIBRATING, READY, EXECUTING, COMPLETE, ERROR
- Combines I/O signals and Tech variables
- Error handling and timeout mechanisms
- Demonstrates complex production-ready pattern

**templates/krl/README.md**
- Comprehensive coordination patterns documentation
- Tech variable mapping conventions (C vs T variables)
- I/O signal mapping standards
- Timing best practices
- Troubleshooting guide

## Python Coordination Examples

Created production-ready Python examples demonstrating all coordination methods:

**examples/coordination/01_basic_handshake.py**
- Simple I/O handshake demonstration
- Matches basic_handshake.src template
- Command-line interface with argparse
- Comprehensive logging and error handling

**examples/coordination/02_parameter_passing.py**
- Parameter exchange workflow
- Reads position from KRL (Tech.T)
- Calculates target position
- Writes target to KRL (Tech.C)
- Matches parameter_passing.src template

**examples/coordination/03_state_machine.py**
- Complex multi-state coordination
- State monitoring loop with enum
- Calibration routine with offset calculation
- Error detection and signaling
- Matches state_machine.src template

**examples/coordination/README.md**
- Complete usage instructions
- Configuration requirements
- Troubleshooting section
- Customization examples
- Advanced usage patterns

## Modified Files

src/RSIPI/io_api.py:
- Added time import
- Implemented set_output() method
- Implemented get_input() method with navigation of receive_variables
- Implemented pulse() method with blocking time.sleep()
- Comprehensive docstrings with examples

src/RSIPI/krl_api.py:
- Added time import
- Implemented wait_for_signal() with configurable polling
- Implemented signal_complete() method
- Implemented write_param() with slot normalization and validation
- Implemented read_param() with slot normalization and validation
- KRL code examples in all docstrings

## New Directories

templates/krl/
- 3 KRL program templates
- Comprehensive README with patterns and conventions

examples/coordination/
- 3 Python example scripts
- Complete usage documentation

## Design Decisions

**I/O Channel Numbering**: 1-based to match KUKA conventions
**Tech Variable Slots**: Validated 11-199 range (KUKA reserves 1-10)
**Blocking Operations**: wait_for_signal() and pulse() block with configurable timeouts
**Error Handling**: Proper exceptions with clear messages
**Logging**: Debug/Info/Warning levels for all operations
**Documentation**: Every method includes KRL code examples

## Phase 3 Status:  COMPLETE

All planned features implemented:
-  High-level Digital I/O API
-  KRL state coordination helpers
-  Parameter passing via Tech variables
-  KRL code templates
-  Python coordination examples
-  Comprehensive documentation

Next: Phase 4 (Advanced Motion Control)
2026-01-17 00:38:32 +00:00

134 lines
4.3 KiB
Python

"""
Parameter Passing Example
Demonstrates bidirectional numerical data exchange between Python and KRL
using RSI Tech variables. Works with templates/krl/parameter_passing.src
Flow:
1. KRL writes current position to Tech.T11-T16
2. Python waits for data ready signal
3. Python reads position from Tech.T
4. Python calculates target and writes to Tech.C11-C13
5. Python signals completion
6. KRL reads target from Tech.C and executes motion
Usage:
python 02_parameter_passing.py --config RSI_EthernetConfig.xml
"""
import argparse
import logging
from RSIPI import RSIAPI
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def parameter_passing_example(config_file: str) -> None:
"""
Execute parameter passing coordination with KRL program.
Reads current position from KRL, calculates target position,
and sends target back to KRL for execution.
Args:
config_file: Path to RSI configuration XML file
"""
api = RSIAPI(config_file)
try:
logging.info("Starting RSI communication...")
api.start()
logging.info("✅ RSI started successfully")
# Wait for KRL to signal that data is ready
logging.info("Waiting for KRL data ready signal...")
if api.krl.wait_for_signal(1, timeout=30.0):
logging.info("✅ KRL signaled data ready!")
# Read current position from Tech.T variables
logging.info("Reading current position from KRL...")
current_x = api.krl.read_param('T11')
current_y = api.krl.read_param('T12')
current_z = api.krl.read_param('T13')
current_a = api.krl.read_param('T14')
current_b = api.krl.read_param('T15')
current_c = api.krl.read_param('T16')
logging.info(f"Current position:")
logging.info(f" X: {current_x:.2f} mm")
logging.info(f" Y: {current_y:.2f} mm")
logging.info(f" Z: {current_z:.2f} mm")
logging.info(f" A: {current_a:.2f}°, B: {current_b:.2f}°, C: {current_c:.2f}°")
# Calculate target position (example: move 100mm in X, 50mm in Y)
logging.info("Calculating target position...")
target_x = current_x + 100.0 # Move 100mm in X
target_y = current_y + 50.0 # Move 50mm in Y
target_z = current_z + 0.0 # Keep Z constant
logging.info(f"Calculated target:")
logging.info(f" X: {target_x:.2f} mm (+100mm)")
logging.info(f" Y: {target_y:.2f} mm (+50mm)")
logging.info(f" Z: {target_z:.2f} mm (no change)")
# Write target position to Tech.C variables for KRL to read
logging.info("Writing target position to KRL...")
api.krl.write_param('C11', target_x)
api.krl.write_param('C12', target_y)
api.krl.write_param('C13', target_z)
logging.info("✅ Target position written to Tech.C")
# Signal KRL that calculation is complete
api.krl.signal_complete(1)
logging.info("✅ Signaled KRL that target is ready")
# KRL will now read target and execute motion
logging.info("KRL will now execute motion to calculated target")
else:
logging.error("❌ Timeout waiting for KRL data ready signal")
except KeyboardInterrupt:
logging.warning("\n⚠️ Interrupted by user")
except Exception as e:
logging.error(f"❌ Error during parameter passing: {e}")
finally:
logging.info("Stopping RSI communication...")
api.stop()
logging.info("✅ API stopped successfully")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='Parameter Passing Example')
parser.add_argument(
'--config',
type=str,
default='RSI_EthernetConfig.xml',
help='Path to RSI configuration file'
)
args = parser.parse_args()
logging.info("=" * 60)
logging.info("RSIPI - Parameter Passing Example")
logging.info("=" * 60)
logging.info(f"Config: {args.config}")
logging.info("=" * 60)
parameter_passing_example(args.config)
logging.info("=" * 60)
logging.info("Example complete!")
logging.info("=" * 60)
if __name__ == '__main__':
main()