Added trajectory planning to cli and api. Updated readme.

This commit is contained in:
Adam 2025-04-02 22:46:21 +01:00
parent e54a2cc7a9
commit 259dd55c84
5 changed files with 375 additions and 95 deletions

237
README.md
View File

@ -1,120 +1,179 @@
# RSIPI Robot Sensor Interface Python Integration
# RSIPI: Robot Sensor Interface - Python Integration
RSIPI (Robot Sensor Interface Python Integration) is a comprehensive Python package designed to simplify and enhance the integration and control of KUKA robots using the Robot Sensor Interface (RSI). It provides intuitive APIs, real-time data visualisation, dynamic control capabilities, and easy logging, suitable for research, development, and deployment of advanced robotics applications.
RSIPI is a high-performance, Python-based communication and control system designed for real-time interfacing with KUKA robots using the Robot Sensor Interface (RSI) protocol. It provides both a robust **API** for developers and a powerful **Command Line Interface (CLI)** for researchers and engineers who need to monitor, control, and analyse robotic movements in real time.
## 🚀 Features
---
- **Real-Time Control:** Seamless communication with KUKA RSI.
- **Dynamic Variable Updates:** Update robot parameters dynamically during runtime.
- **CSV Logging:** Record all robot interactions and sensor data for easy analysis.
- **Live Graphing:** Real-time visualisation of position, velocity, acceleration, and force.
- **Command-Line Interface (CLI):** User-friendly terminal-based robot interaction.
- **Python API:** Easily integrate RSIPI into custom Python applications.
## 📄 Description
## 📂 Project Structure
RSIPI allows users to:
- Communicate with KUKA robots using the RSI XML-based protocol.
- Dynamically update control variables (TCP position, joint angles, I/O, external axes, etc.).
- Log and visualise robot movements.
- Analyse motion data and compare planned vs actual trajectories.
```
RSI-PI/
├── src/
│ └── RSIPI/
│ ├── rsi_api.py
│ ├── rsi_client.py
│ ├── network_process.py
│ ├── config_parser.py
│ ├── xml_handler.py
│ ├── rsi_config.py
│ ├── cli.py
│ ├── graphing.py
│ └── test_rsipi.py
├── RSI_EthernetConfig.xml
├── setup.py
└── README.md
```
### Target Audience
- **Researchers** working on advanced robotic applications, control algorithms, and feedback systems.
- **Engineers** developing robotic workflows or automated processes.
- **Educators** using real robots in coursework or lab environments.
- **Students** learning about robot control systems and data-driven motion planning.
## ⚙️ Installation
---
To install RSIPI, clone the repository and use `pip`:
## 📊 Features
- Real-time network communication with KUKA RSI over UDP.
- Structured logging to CSV with British date formatting.
- Background execution and live variable updates.
- Fully-featured Python API for scripting or external integration.
- CLI for interactive control and live monitoring.
- Real-time and post-analysis graphing.
- Basic trajectory planning and playback (Cartesian and Joint interpolation).
```bash
git clone <your-repo-url>
cd RSI-PI
pip install .
```
---
## ▶️ Getting Started
**Quick example:**
## 📊 API Overview (`rsi_api.py`)
### Initialization
```python
from RSIPI import RSIAPI
from time import sleep
from src.RSIPI import rsi_api
api = rsi_api.RSIAPI(config_path='RSI_EthernetConfig.xml')
```
# Initialise RSIAPI
api = RSIAPI("RSI_EthernetConfig.xml")
### Methods
| Method | CLI | API | Description |
|-------|-----|-----|-------------|
| `start_rsi()` | ✅ | ✅ | Starts RSI communication (non-blocking). |
| `stop_rsi()` | ✅ | ✅ | Stops RSI communication. |
| `update_variable(path, value)` | ✅ | ✅ | Dynamically updates a send variable (e.g. `RKorr.X`). |
| `get_variable(path)` | ✅ | ✅ | Retrieves the latest value of any variable. |
| `enable_logging(include=None, exclude=None)` | ❌ | ✅ | Starts CSV logging in background. |
| `disable_logging()` | ❌ | ✅ | Stops CSV logging. |
| `enable_graphing(mode='tcp')` | ❌ | ✅ | Enables real-time graphing (TCP or joint). |
| `disable_graphing()` | ❌ | ✅ | Disables graphing. |
| `plan_linear_cartesian(start, end, steps)` | ❌ | ✅ | Creates a Cartesian path. |
| `plan_linear_joint(start, end, steps)` | ❌ | ✅ | Creates a joint-space path. |
| `execute_trajectory(traj, delay=0.012)` | ❌ | ✅ | Sends a trajectory to robot using RSI corrections. |
# Start RSI communication
---
## 🔧 CLI Overview (`rsi_cli.py`)
Start the CLI:
```bash
python main.py --cli
```
### Available Commands:
| Command | Description |
|---------|-------------|
| `start` | Starts the RSI client. |
| `stop` | Stops RSI communication. |
| `set <variable> <value>` | Updates a send variable. |
| `get <variable>` | Displays the current value of a variable. |
| `graph on/off` | Enables/disables live graphing. |
| `log on/off` | Enables/disables logging. |
| `status` | Displays current status. |
| `exit` | Exits the CLI. |
---
## 📃 Examples
### Start RSI and update Cartesian coordinates
```python
api.start_rsi()
# Dynamically update a variable
api.update_variable("EStr", "RSIPI Test")
# Log data to CSV
api.start_logging("robot_data.csv")
sleep(10)
# Stop logging and RSI communication
api.stop_logging()
api.stop_rsi()
api.update_variable('RKorr.X', 100.0)
api.update_variable('RKorr.Y', 200.0)
api.update_variable('RKorr.Z', 300.0)
```
## 💻 Command-Line Interface
Start the CLI for interactive robot control:
```bash
python src/RSIPI/rsi_cli.py
```
Example CLI commands:
- `start`: Start RSI communication
- `set EStr HelloWorld`: Update variable
- `log start data.csv`: Start logging
- `graph start position`: Start live graphing
- `stop`: Stop RSI communication
## 📊 Graphing and Visualisation
RSIPI supports real-time plotting and analysis:
### Retrieve joint positions
```python
api.start_graphing(mode="position") # Real-time position graph
a1 = api.get_variable('AIPos.A1')
```
## 🧪 Running Tests
Tests are provided to verify all functionalities:
### Plan and execute Cartesian trajectory
```python
start = {'X': 0, 'Y': 0, 'Z': 0, 'A': 0, 'B': 0, 'C': 0}
end = {'X': 100, 'Y': 100, 'Z': 0, 'A': 0, 'B': 0, 'C': 0}
traj = api.plan_linear_cartesian(start, end, steps=50)
api.execute_trajectory(traj)
```
### CLI Sample
```bash
python -m unittest discover -s src/RSIPI -p "test_*.py"
> start
> set RKorr.X 150
> set DiO 255
> get AIPos.A1
> log on
> graph on
> stop
```
## 📝 Citation
---
If you use RSIPI in your research, please cite it:
## 📤 Output & Logs
- CSV logs saved to `logs/` folder.
- Each log includes timestamp, sent and received values in individual columns.
- Graphs can be saved manually as PNG/PDF from the visualisation window.
---
## 🚀 Getting Started
1. Connect robot and PC via Ethernet.
2. Deploy KUKA RSI program with matching configuration.
3. Install dependencies:
```bash
pip install -r requirements.txt
```
@misc{YourName2025RSIPI,
author = {Your Name},
title = {RSIPI: Robot Sensor Interface Python Integration},
4. Run `main.py` and use CLI or import API in your Python program.
---
## 🔖 Citation
If you use RSIPI in your research, please cite:
```
@software{rsipi2025,
author = {RSIPI Development Team},
title = {RSIPI: Robot Sensor Interface - Python Integration},
year = {2025},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/yourusername/RSIPI},
url = {https://github.com/your-org/rsipi},
note = {Accessed: [insert date]}
}
```
## 📚 License
---
## ⚖️ License
RSIPI is licensed under the MIT License:
```
MIT License
Copyright (c) 2025 RSIPI Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
```
---
## 🚧 Disclaimer
RSIPI is designed for research and experimental purposes only. Ensure safe robot operation with appropriate safety measures.
RSIPI is released under the [MIT License](LICENSE).

View File

@ -10,6 +10,8 @@ from .krl_to_csv_parser import KRLParser
from .inject_rsi_to_krl import inject_rsi_to_krl
import threading # (Put this at the top of the file)
from threading import Thread
from .trajectory_planner import generate_trajectory, execute_trajectory
class RSIAPI:
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""
@ -20,6 +22,7 @@ class RSIAPI:
self.graph_process = None # Store graphing process
self.graphing_instance = None
self.graph_thread = None#
self.trajectory_queue = []
import threading # (Put this at the top of the file)
@ -259,3 +262,99 @@ class RSIAPI:
return f"✅ RSI successfully injected into {output_path}"
except Exception as e:
return f"❌ RSI injection failed: {e}"
def generate_trajectory(self, start, end, steps=100, space="cartesian"):
"""Generates a linear trajectory (Cartesian or Joint)."""
return generate_trajectory(start, end, steps, space)
def execute_trajectory(self, trajectory, space="cartesian", rate=0.012):
"""Executes a trajectory using live updates."""
return execute_trajectory(self, trajectory, space, rate)
def queue_trajectory(self, trajectory, space="cartesian", rate=0.012):
"""Adds a trajectory to the internal queue."""
self.trajectory_queue.append({
"trajectory": trajectory,
"space": space,
"rate": rate,
})
def clear_trajectory_queue(self):
"""Clears all queued trajectories."""
self.trajectory_queue.clear()
def get_trajectory_queue(self):
"""Returns current queued trajectories (metadata only)."""
return [
{"space": item["space"], "steps": len(item["trajectory"]), "rate": item["rate"]}
for item in self.trajectory_queue
]
def execute_queued_trajectories(self):
"""Executes all queued trajectories in order."""
for item in self.trajectory_queue:
self.execute_trajectory(item["trajectory"], item["space"], item["rate"])
self.clear_trajectory_queue()
def export_movement_data(self, filename="movement_log.csv"):
"""
Exports recorded movement data (if available) to a CSV file.
Assumes self.client.logger has stored entries.
"""
if not hasattr(self.client, "logger") or self.client.logger is None:
raise RuntimeError("No logger attached to RSI client.")
data = self.client.get_movement_data()
if not data:
raise RuntimeError("No data available to export.")
import pandas as pd
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
return f"✅ Movement data exported to {filename}"
def compare_test_runs(self, file1, file2):
"""
Compares two test run CSV files.
Returns a summary of average and max deviation for each axis.
"""
import pandas as pd
df1 = pd.read_csv(file1)
df2 = pd.read_csv(file2)
shared_cols = [col for col in df1.columns if col in df2.columns and col.startswith("Receive.RIst")]
diffs = {}
for col in shared_cols:
delta = abs(df1[col] - df2[col])
diffs[col] = {
"mean_diff": delta.mean(),
"max_diff": delta.max(),
}
return diffs
def generate_report(self, file, output="report.txt"):
"""
Generates a summary report from a CSV log (e.g., trajectory stats).
"""
import pandas as pd
df = pd.read_csv(file)
position_cols = [col for col in df.columns if "Receive.RIst" in col or "Send.RKorr" in col]
report_lines = []
report_lines.append(f"📄 RSIPI Movement Report")
report_lines.append(f"Source File: {file}")
report_lines.append(f"Total entries: {len(df)}\n")
for col in position_cols:
stats = df[col].describe()
report_lines.append(
f"{col}: mean={stats['mean']:.2f}, std={stats['std']:.2f}, min={stats['min']:.2f}, max={stats['max']:.2f}")
with open(output, "w") as f:
f.write("\n".join(report_lines))
return f"✅ Report generated: {output}"

View File

@ -104,6 +104,63 @@ class RSICommandLineInterface:
elif cmd == "log" and len(parts) == 2 and parts[1] == "status":
active = self.client.is_logging_active()
print(f"📋 Logging is {'ACTIVE' if active else 'INACTIVE'}")
elif cmd == "move_cartesian" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian")
self.client.execute_trajectory(trajectory, space="cartesian", rate=rate)
elif cmd == "move_joint" and len(parts) >= 3:
start_dict = self.parse_pose_string(parts[1])
end_dict = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int)
rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float)
trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint")
self.client.execute_trajectory(trajectory, space="joint", rate=rate)
elif cmd == "queue_cartesian" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.012, float)
traj = self.client.generate_trajectory(start, end, steps, "cartesian")
self.client.queue_trajectory(traj, "cartesian", rate)
elif cmd == "queue_joint" and len(parts) >= 3:
start = self.parse_pose_string(parts[1])
end = self.parse_pose_string(parts[2])
steps = self.extract_optional_value(parts, "steps", 50, int)
rate = self.extract_optional_value(parts, "rate", 0.012, float)
traj = self.client.generate_trajectory(start, end, steps, "joint")
self.client.queue_trajectory(traj, "joint", rate)
elif cmd == "execute_queue":
self.client.execute_queued_trajectories()
elif cmd == "clear_queue":
self.client.clear_trajectory_queue()
elif cmd == "show_queue":
queue = self.client.get_trajectory_queue()
print(f"🧾 Trajectory Queue: {len(queue)} items")
for i, q in enumerate(queue):
print(f" {i + 1}. {q['space']} | {q['steps']} steps | {q['rate']}s")
elif cmd == "export_movement_data" and len(parts) == 2:
result = self.client.export_movement_data(parts[1])
print(result)
elif cmd == "compare_test_runs" and len(parts) == 3:
result = self.client.compare_test_runs(parts[1], parts[2])
print("📊 Comparison Results:")
for key, stats in result.items():
print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}")
elif cmd == "generate_report" and len(parts) in [2, 3]:
output = parts[2] if len(parts) == 3 else "report.txt"
result = self.client.generate_report(parts[1], output)
print(result)
else:
print("❌ Unknown command. Type 'help' for a list of commands.")
@ -168,6 +225,18 @@ Available Commands:
show all - Show all current send and receive variables
show live - Show real-time TCP, force, and IPOC values
log status - Display whether logging is currently active
move_cartesian <start> <end> [steps=50] [rate=0.012]
e.g., X=0,Y=0,Z=500 A=100,Y=0,Z=500 steps=100 rate=0.012
move_joint <start> <end> [steps=50] [rate=0.012]
e.g., A1=0,... A1=90,... steps=60
queue_cartesian <start> <end> [steps=50] [rate=0.012] - Queue linear Cartesian trajectory
queue_joint <start> <end> [steps=50] [rate=0.012] - Queue linear Joint trajectory
show_queue - Show queued trajectory segments
clear_queue - Clear all queued trajectories
execute_queue - Execute all queued motions
export_movement_data <csvfile> - Export logged motion data to CSV
compare_test_runs <file1> <file2> - Compare 2 test logs (e.g. deviation)
generate_report <input.csv> [out.txt] - Create a movement analysis report
""")
@ -204,6 +273,18 @@ Available Commands:
except Exception as e:
print(f"❌ Failed to inject RSI commands: {e}")
def extract_optional_value(self, parts, key, default=0, cast_type=float):
"""
Extracts optional arguments like 'steps=100' or 'rate=0.01'
"""
for part in parts[3:]: # skip cmd, start, end
if part.startswith(f"{key}="):
try:
return cast_type(part.split("=")[1])
except ValueError:
return default
return default
if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml"
cli = RSICommandLineInterface(config_file)

View File

@ -117,13 +117,13 @@ class RSIClient:
self.network_process.start()
def get_movement_data(self):
# Mock method
return [
{"time": 0, "X": 0, "Y": 0, "Z": 0, "A1": 0.1, "A2": 0.0},
{"time": 1, "X": 10, "Y": 5, "Z": 0, "A1": 0.2, "A2": 0.0},
{"time": 2, "X": 20, "Y": 10, "Z": 0, "A1": 0.3, "A2": 0.0}
]
"""Returns a list of all logged data entries."""
if hasattr(self, "logger") and self.logger:
return self.logger.get_all_records()
return []
if __name__ == "__main__":
config_file = "RSI_EthernetConfig.xml"

View File

@ -0,0 +1,41 @@
import numpy as np
def generate_trajectory(start, end, steps=100, space="cartesian"):
"""
Generates a linear trajectory from start to end over `steps`.
Supported spaces: 'cartesian', 'joint'
"""
if not isinstance(start, dict) or not isinstance(end, dict):
raise ValueError("Start and end must be dictionaries.")
keys = sorted(start.keys())
if keys != sorted(end.keys()):
raise ValueError("Start and end must have matching keys.")
traj = []
for i in range(steps):
point = {}
for key in keys:
point[key] = float(np.linspace(start[key], end[key], steps)[i])
traj.append(point)
return traj
def execute_trajectory(api, trajectory, space="cartesian", rate=0.012):
"""
Sends the trajectory to the robot using the RSIPI API.
- space: 'cartesian' or 'joint'
- rate: time between points in seconds (e.g., 0.012 for 12 ms)
"""
import time
for point in trajectory:
if space == "cartesian":
api.update_cartesian(**point)
elif space == "joint":
api.update_joints(**point)
else:
raise ValueError("Space must be 'cartesian' or 'joint'")
time.sleep(rate)