diff --git a/README.md b/README.md index f390d61..7b7688e 100644 --- a/README.md +++ b/README.md @@ -1,120 +1,179 @@ -# RSIPI โ€“ Robot Sensor Interface Python Integration +# RSIPI: Robot Sensor Interface - Python Integration -RSIPI (Robot Sensor Interface Python Integration) is a comprehensive Python package designed to simplify and enhance the integration and control of KUKA robots using the Robot Sensor Interface (RSI). It provides intuitive APIs, real-time data visualisation, dynamic control capabilities, and easy logging, suitable for research, development, and deployment of advanced robotics applications. +RSIPI is a high-performance, Python-based communication and control system designed for real-time interfacing with KUKA robots using the Robot Sensor Interface (RSI) protocol. It provides both a robust **API** for developers and a powerful **Command Line Interface (CLI)** for researchers and engineers who need to monitor, control, and analyse robotic movements in real time. -## ๐Ÿš€ Features +--- -- **Real-Time Control:** Seamless communication with KUKA RSI. -- **Dynamic Variable Updates:** Update robot parameters dynamically during runtime. -- **CSV Logging:** Record all robot interactions and sensor data for easy analysis. -- **Live Graphing:** Real-time visualisation of position, velocity, acceleration, and force. -- **Command-Line Interface (CLI):** User-friendly terminal-based robot interaction. -- **Python API:** Easily integrate RSIPI into custom Python applications. +## ๐Ÿ“„ Description -## ๐Ÿ“‚ Project Structure +RSIPI allows users to: +- Communicate with KUKA robots using the RSI XML-based protocol. +- Dynamically update control variables (TCP position, joint angles, I/O, external axes, etc.). +- Log and visualise robot movements. +- Analyse motion data and compare planned vs actual trajectories. -``` -RSI-PI/ -โ”œโ”€โ”€ src/ -โ”‚ โ””โ”€โ”€ RSIPI/ -โ”‚ โ”œโ”€โ”€ rsi_api.py -โ”‚ โ”œโ”€โ”€ rsi_client.py -โ”‚ โ”œโ”€โ”€ network_process.py -โ”‚ โ”œโ”€โ”€ config_parser.py -โ”‚ โ”œโ”€โ”€ xml_handler.py -โ”‚ โ”œโ”€โ”€ rsi_config.py -โ”‚ โ”œโ”€โ”€ cli.py -โ”‚ โ”œโ”€โ”€ graphing.py -โ”‚ โ””โ”€โ”€ test_rsipi.py -โ”œโ”€โ”€ RSI_EthernetConfig.xml -โ”œโ”€โ”€ setup.py -โ””โ”€โ”€ README.md -``` +### Target Audience +- **Researchers** working on advanced robotic applications, control algorithms, and feedback systems. +- **Engineers** developing robotic workflows or automated processes. +- **Educators** using real robots in coursework or lab environments. +- **Students** learning about robot control systems and data-driven motion planning. -## โš™๏ธ Installation +--- -To install RSIPI, clone the repository and use `pip`: +## ๐Ÿ“Š Features +- Real-time network communication with KUKA RSI over UDP. +- Structured logging to CSV with British date formatting. +- Background execution and live variable updates. +- Fully-featured Python API for scripting or external integration. +- CLI for interactive control and live monitoring. +- Real-time and post-analysis graphing. +- Basic trajectory planning and playback (Cartesian and Joint interpolation). -```bash -git clone -cd RSI-PI -pip install . -``` +--- -## โ–ถ๏ธ Getting Started - -**Quick example:** +## ๐Ÿ“Š API Overview (`rsi_api.py`) +### Initialization ```python -from RSIPI import RSIAPI -from time import sleep +from src.RSIPI import rsi_api +api = rsi_api.RSIAPI(config_path='RSI_EthernetConfig.xml') +``` -# Initialise RSIAPI -api = RSIAPI("RSI_EthernetConfig.xml") +### Methods +| Method | CLI | API | Description | +|-------|-----|-----|-------------| +| `start_rsi()` | โœ… | โœ… | Starts RSI communication (non-blocking). | +| `stop_rsi()` | โœ… | โœ… | Stops RSI communication. | +| `update_variable(path, value)` | โœ… | โœ… | Dynamically updates a send variable (e.g. `RKorr.X`). | +| `get_variable(path)` | โœ… | โœ… | Retrieves the latest value of any variable. | +| `enable_logging(include=None, exclude=None)` | โŒ | โœ… | Starts CSV logging in background. | +| `disable_logging()` | โŒ | โœ… | Stops CSV logging. | +| `enable_graphing(mode='tcp')` | โŒ | โœ… | Enables real-time graphing (TCP or joint). | +| `disable_graphing()` | โŒ | โœ… | Disables graphing. | +| `plan_linear_cartesian(start, end, steps)` | โŒ | โœ… | Creates a Cartesian path. | +| `plan_linear_joint(start, end, steps)` | โŒ | โœ… | Creates a joint-space path. | +| `execute_trajectory(traj, delay=0.012)` | โŒ | โœ… | Sends a trajectory to robot using RSI corrections. | -# Start RSI communication +--- + +## ๐Ÿ”ง CLI Overview (`rsi_cli.py`) + +Start the CLI: +```bash +python main.py --cli +``` + +### Available Commands: +| Command | Description | +|---------|-------------| +| `start` | Starts the RSI client. | +| `stop` | Stops RSI communication. | +| `set ` | Updates a send variable. | +| `get ` | Displays the current value of a variable. | +| `graph on/off` | Enables/disables live graphing. | +| `log on/off` | Enables/disables logging. | +| `status` | Displays current status. | +| `exit` | Exits the CLI. | + +--- + +## ๐Ÿ“ƒ Examples + +### Start RSI and update Cartesian coordinates +```python api.start_rsi() - -# Dynamically update a variable -api.update_variable("EStr", "RSIPI Test") - -# Log data to CSV -api.start_logging("robot_data.csv") - -sleep(10) - -# Stop logging and RSI communication -api.stop_logging() -api.stop_rsi() +api.update_variable('RKorr.X', 100.0) +api.update_variable('RKorr.Y', 200.0) +api.update_variable('RKorr.Z', 300.0) ``` -## ๐Ÿ’ป Command-Line Interface - -Start the CLI for interactive robot control: - -```bash -python src/RSIPI/rsi_cli.py -``` - -Example CLI commands: - -- `start`: Start RSI communication -- `set EStr HelloWorld`: Update variable -- `log start data.csv`: Start logging -- `graph start position`: Start live graphing -- `stop`: Stop RSI communication - -## ๐Ÿ“Š Graphing and Visualisation - -RSIPI supports real-time plotting and analysis: - +### Retrieve joint positions ```python -api.start_graphing(mode="position") # Real-time position graph +a1 = api.get_variable('AIPos.A1') ``` -## ๐Ÿงช Running Tests - -Tests are provided to verify all functionalities: +### Plan and execute Cartesian trajectory +```python +start = {'X': 0, 'Y': 0, 'Z': 0, 'A': 0, 'B': 0, 'C': 0} +end = {'X': 100, 'Y': 100, 'Z': 0, 'A': 0, 'B': 0, 'C': 0} +traj = api.plan_linear_cartesian(start, end, steps=50) +api.execute_trajectory(traj) +``` +### CLI Sample ```bash -python -m unittest discover -s src/RSIPI -p "test_*.py" +> start +> set RKorr.X 150 +> set DiO 255 +> get AIPos.A1 +> log on +> graph on +> stop ``` -## ๐Ÿ“ Citation +--- -If you use RSIPI in your research, please cite it: +## ๐Ÿ“ค Output & Logs +- CSV logs saved to `logs/` folder. +- Each log includes timestamp, sent and received values in individual columns. +- Graphs can be saved manually as PNG/PDF from the visualisation window. +--- + +## ๐Ÿš€ Getting Started +1. Connect robot and PC via Ethernet. +2. Deploy KUKA RSI program with matching configuration. +3. Install dependencies: +```bash +pip install -r requirements.txt ``` -@misc{YourName2025RSIPI, - author = {Your Name}, - title = {RSIPI: Robot Sensor Interface Python Integration}, +4. Run `main.py` and use CLI or import API in your Python program. + +--- + +## ๐Ÿ”– Citation +If you use RSIPI in your research, please cite: +``` +@software{rsipi2025, + author = {RSIPI Development Team}, + title = {RSIPI: Robot Sensor Interface - Python Integration}, year = {2025}, - publisher = {GitHub}, - journal = {GitHub repository}, - url = {https://github.com/yourusername/RSIPI}, + url = {https://github.com/your-org/rsipi}, + note = {Accessed: [insert date]} } ``` -## ๐Ÿ“š License +--- + +## โš–๏ธ License +RSIPI is licensed under the MIT License: + +``` +MIT License + +Copyright (c) 2025 RSIPI Developers + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +``` + +--- + +## ๐Ÿšง Disclaimer +RSIPI is designed for research and experimental purposes only. Ensure safe robot operation with appropriate safety measures. -RSIPI is released under the [MIT License](LICENSE). \ No newline at end of file diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 210361f..dc7a17f 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -10,6 +10,8 @@ from .krl_to_csv_parser import KRLParser from .inject_rsi_to_krl import inject_rsi_to_krl import threading # (Put this at the top of the file) from threading import Thread +from .trajectory_planner import generate_trajectory, execute_trajectory + class RSIAPI: """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" @@ -20,6 +22,7 @@ class RSIAPI: self.graph_process = None # Store graphing process self.graphing_instance = None self.graph_thread = None# + self.trajectory_queue = [] import threading # (Put this at the top of the file) @@ -259,3 +262,99 @@ class RSIAPI: return f"โœ… RSI successfully injected into {output_path}" except Exception as e: return f"โŒ RSI injection failed: {e}" + + def generate_trajectory(self, start, end, steps=100, space="cartesian"): + """Generates a linear trajectory (Cartesian or Joint).""" + return generate_trajectory(start, end, steps, space) + + def execute_trajectory(self, trajectory, space="cartesian", rate=0.012): + """Executes a trajectory using live updates.""" + return execute_trajectory(self, trajectory, space, rate) + + def queue_trajectory(self, trajectory, space="cartesian", rate=0.012): + """Adds a trajectory to the internal queue.""" + self.trajectory_queue.append({ + "trajectory": trajectory, + "space": space, + "rate": rate, + }) + + def clear_trajectory_queue(self): + """Clears all queued trajectories.""" + self.trajectory_queue.clear() + + def get_trajectory_queue(self): + """Returns current queued trajectories (metadata only).""" + return [ + {"space": item["space"], "steps": len(item["trajectory"]), "rate": item["rate"]} + for item in self.trajectory_queue + ] + + def execute_queued_trajectories(self): + """Executes all queued trajectories in order.""" + for item in self.trajectory_queue: + self.execute_trajectory(item["trajectory"], item["space"], item["rate"]) + self.clear_trajectory_queue() + + def export_movement_data(self, filename="movement_log.csv"): + """ + Exports recorded movement data (if available) to a CSV file. + Assumes self.client.logger has stored entries. + """ + if not hasattr(self.client, "logger") or self.client.logger is None: + raise RuntimeError("No logger attached to RSI client.") + + data = self.client.get_movement_data() + if not data: + raise RuntimeError("No data available to export.") + + import pandas as pd + df = pd.DataFrame(data) + df.to_csv(filename, index=False) + return f"โœ… Movement data exported to {filename}" + + def compare_test_runs(self, file1, file2): + """ + Compares two test run CSV files. + Returns a summary of average and max deviation for each axis. + """ + import pandas as pd + + df1 = pd.read_csv(file1) + df2 = pd.read_csv(file2) + + shared_cols = [col for col in df1.columns if col in df2.columns and col.startswith("Receive.RIst")] + diffs = {} + + for col in shared_cols: + delta = abs(df1[col] - df2[col]) + diffs[col] = { + "mean_diff": delta.mean(), + "max_diff": delta.max(), + } + + return diffs + + def generate_report(self, file, output="report.txt"): + """ + Generates a summary report from a CSV log (e.g., trajectory stats). + """ + import pandas as pd + + df = pd.read_csv(file) + position_cols = [col for col in df.columns if "Receive.RIst" in col or "Send.RKorr" in col] + report_lines = [] + + report_lines.append(f"๐Ÿ“„ RSIPI Movement Report") + report_lines.append(f"Source File: {file}") + report_lines.append(f"Total entries: {len(df)}\n") + + for col in position_cols: + stats = df[col].describe() + report_lines.append( + f"{col}: mean={stats['mean']:.2f}, std={stats['std']:.2f}, min={stats['min']:.2f}, max={stats['max']:.2f}") + + with open(output, "w") as f: + f.write("\n".join(report_lines)) + + return f"โœ… Report generated: {output}" diff --git a/src/RSIPI/rsi_cli.py b/src/RSIPI/rsi_cli.py index 7c4f6df..bb53771 100644 --- a/src/RSIPI/rsi_cli.py +++ b/src/RSIPI/rsi_cli.py @@ -104,6 +104,63 @@ class RSICommandLineInterface: elif cmd == "log" and len(parts) == 2 and parts[1] == "status": active = self.client.is_logging_active() print(f"๐Ÿ“‹ Logging is {'ACTIVE' if active else 'INACTIVE'}") + elif cmd == "move_cartesian" and len(parts) >= 3: + start_dict = self.parse_pose_string(parts[1]) + end_dict = self.parse_pose_string(parts[2]) + steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int) + rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float) + + trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="cartesian") + self.client.execute_trajectory(trajectory, space="cartesian", rate=rate) + + elif cmd == "move_joint" and len(parts) >= 3: + start_dict = self.parse_pose_string(parts[1]) + end_dict = self.parse_pose_string(parts[2]) + steps = self.extract_optional_value(parts, "steps", default=50, cast_type=int) + rate = self.extract_optional_value(parts, "rate", default=0.012, cast_type=float) + + trajectory = self.client.generate_trajectory(start_dict, end_dict, steps=steps, space="joint") + self.client.execute_trajectory(trajectory, space="joint", rate=rate) + elif cmd == "queue_cartesian" and len(parts) >= 3: + start = self.parse_pose_string(parts[1]) + end = self.parse_pose_string(parts[2]) + steps = self.extract_optional_value(parts, "steps", 50, int) + rate = self.extract_optional_value(parts, "rate", 0.012, float) + traj = self.client.generate_trajectory(start, end, steps, "cartesian") + self.client.queue_trajectory(traj, "cartesian", rate) + + elif cmd == "queue_joint" and len(parts) >= 3: + start = self.parse_pose_string(parts[1]) + end = self.parse_pose_string(parts[2]) + steps = self.extract_optional_value(parts, "steps", 50, int) + rate = self.extract_optional_value(parts, "rate", 0.012, float) + traj = self.client.generate_trajectory(start, end, steps, "joint") + self.client.queue_trajectory(traj, "joint", rate) + + elif cmd == "execute_queue": + self.client.execute_queued_trajectories() + + elif cmd == "clear_queue": + self.client.clear_trajectory_queue() + + elif cmd == "show_queue": + queue = self.client.get_trajectory_queue() + print(f"๐Ÿงพ Trajectory Queue: {len(queue)} items") + for i, q in enumerate(queue): + print(f" {i + 1}. {q['space']} | {q['steps']} steps | {q['rate']}s") + elif cmd == "export_movement_data" and len(parts) == 2: + result = self.client.export_movement_data(parts[1]) + print(result) + elif cmd == "compare_test_runs" and len(parts) == 3: + result = self.client.compare_test_runs(parts[1], parts[2]) + print("๐Ÿ“Š Comparison Results:") + for key, stats in result.items(): + print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}") + elif cmd == "generate_report" and len(parts) in [2, 3]: + output = parts[2] if len(parts) == 3 else "report.txt" + result = self.client.generate_report(parts[1], output) + print(result) + else: print("โŒ Unknown command. Type 'help' for a list of commands.") @@ -168,6 +225,18 @@ Available Commands: show all - Show all current send and receive variables show live - Show real-time TCP, force, and IPOC values log status - Display whether logging is currently active + move_cartesian [steps=50] [rate=0.012] + e.g., X=0,Y=0,Z=500 A=100,Y=0,Z=500 steps=100 rate=0.012 + move_joint [steps=50] [rate=0.012] + e.g., A1=0,... A1=90,... steps=60 + queue_cartesian [steps=50] [rate=0.012] - Queue linear Cartesian trajectory + queue_joint [steps=50] [rate=0.012] - Queue linear Joint trajectory + show_queue - Show queued trajectory segments + clear_queue - Clear all queued trajectories + execute_queue - Execute all queued motions + export_movement_data - Export logged motion data to CSV + compare_test_runs - Compare 2 test logs (e.g. deviation) + generate_report [out.txt] - Create a movement analysis report """) @@ -204,6 +273,18 @@ Available Commands: except Exception as e: print(f"โŒ Failed to inject RSI commands: {e}") + def extract_optional_value(self, parts, key, default=0, cast_type=float): + """ + Extracts optional arguments like 'steps=100' or 'rate=0.01' + """ + for part in parts[3:]: # skip cmd, start, end + if part.startswith(f"{key}="): + try: + return cast_type(part.split("=")[1]) + except ValueError: + return default + return default + if __name__ == "__main__": config_file = "RSI_EthernetConfig.xml" cli = RSICommandLineInterface(config_file) diff --git a/src/RSIPI/rsi_client.py b/src/RSIPI/rsi_client.py index 41c0edc..6378345 100644 --- a/src/RSIPI/rsi_client.py +++ b/src/RSIPI/rsi_client.py @@ -117,13 +117,13 @@ class RSIClient: self.network_process.start() + def get_movement_data(self): - # Mock method - return [ - {"time": 0, "X": 0, "Y": 0, "Z": 0, "A1": 0.1, "A2": 0.0}, - {"time": 1, "X": 10, "Y": 5, "Z": 0, "A1": 0.2, "A2": 0.0}, - {"time": 2, "X": 20, "Y": 10, "Z": 0, "A1": 0.3, "A2": 0.0} - ] + """Returns a list of all logged data entries.""" + if hasattr(self, "logger") and self.logger: + return self.logger.get_all_records() + return [] + if __name__ == "__main__": config_file = "RSI_EthernetConfig.xml" diff --git a/src/RSIPI/trajectory_planner.py b/src/RSIPI/trajectory_planner.py new file mode 100644 index 0000000..8bb3cc0 --- /dev/null +++ b/src/RSIPI/trajectory_planner.py @@ -0,0 +1,41 @@ +import numpy as np + + +def generate_trajectory(start, end, steps=100, space="cartesian"): + """ + Generates a linear trajectory from start to end over `steps`. + Supported spaces: 'cartesian', 'joint' + """ + if not isinstance(start, dict) or not isinstance(end, dict): + raise ValueError("Start and end must be dictionaries.") + + keys = sorted(start.keys()) + if keys != sorted(end.keys()): + raise ValueError("Start and end must have matching keys.") + + traj = [] + for i in range(steps): + point = {} + for key in keys: + point[key] = float(np.linspace(start[key], end[key], steps)[i]) + traj.append(point) + + return traj + + +def execute_trajectory(api, trajectory, space="cartesian", rate=0.012): + """ + Sends the trajectory to the robot using the RSIPI API. + - space: 'cartesian' or 'joint' + - rate: time between points in seconds (e.g., 0.012 for 12 ms) + """ + import time + + for point in trajectory: + if space == "cartesian": + api.update_cartesian(**point) + elif space == "joint": + api.update_joints(**point) + else: + raise ValueError("Space must be 'cartesian' or 'joint'") + time.sleep(rate)