Currently, the network is in a working state.

In the middle of testing all methods.
This commit is contained in:
Adam 2025-03-26 20:21:56 +00:00
parent 7006d194de
commit 45a36b373c
10 changed files with 264 additions and 17 deletions

View File

@ -1,3 +1,4 @@
GNU AFFERO GENERAL PUBLIC LICENSE GNU AFFERO GENERAL PUBLIC LICENSE
Version 3, 19 November 2007 Version 3, 19 November 2007

120
README.md
View File

@ -1,2 +1,120 @@
# RSI-PI # RSIPI Robot Sensor Interface Python Integration
RSIPI (Robot Sensor Interface Python Integration) is a comprehensive Python package designed to simplify and enhance the integration and control of KUKA robots using the Robot Sensor Interface (RSI). It provides intuitive APIs, real-time data visualisation, dynamic control capabilities, and easy logging, suitable for research, development, and deployment of advanced robotics applications.
## 🚀 Features
- **Real-Time Control:** Seamless communication with KUKA RSI.
- **Dynamic Variable Updates:** Update robot parameters dynamically during runtime.
- **CSV Logging:** Record all robot interactions and sensor data for easy analysis.
- **Live Graphing:** Real-time visualisation of position, velocity, acceleration, and force.
- **Command-Line Interface (CLI):** User-friendly terminal-based robot interaction.
- **Python API:** Easily integrate RSIPI into custom Python applications.
## 📂 Project Structure
```
RSI-PI/
├── src/
│ └── RSIPI/
│ ├── rsi_api.py
│ ├── rsi_client.py
│ ├── network_process.py
│ ├── config_parser.py
│ ├── xml_handler.py
│ ├── rsi_config.py
│ ├── cli.py
│ ├── graphing.py
│ └── test_rsipi.py
├── RSI_EthernetConfig.xml
├── setup.py
└── README.md
```
## ⚙️ Installation
To install RSIPI, clone the repository and use `pip`:
```bash
git clone <your-repo-url>
cd RSI-PI
pip install .
```
## ▶️ Getting Started
**Quick example:**
```python
from RSIPI import RSIAPI
from time import sleep
# Initialise RSIAPI
api = RSIAPI("RSI_EthernetConfig.xml")
# Start RSI communication
api.start_rsi()
# Dynamically update a variable
api.update_variable("EStr", "RSIPI Test")
# Log data to CSV
api.start_logging("robot_data.csv")
sleep(10)
# Stop logging and RSI communication
api.stop_logging()
api.stop_rsi()
```
## 💻 Command-Line Interface
Start the CLI for interactive robot control:
```bash
python src/RSIPI/cli.py
```
Example CLI commands:
- `start`: Start RSI communication
- `set EStr HelloWorld`: Update variable
- `log start data.csv`: Start logging
- `graph start position`: Start live graphing
- `stop`: Stop RSI communication
## 📊 Graphing and Visualisation
RSIPI supports real-time plotting and analysis:
```python
api.start_graphing(mode="position") # Real-time position graph
```
## 🧪 Running Tests
Tests are provided to verify all functionalities:
```bash
python -m unittest discover -s src/RSIPI -p "test_*.py"
```
## 📝 Citation
If you use RSIPI in your research, please cite it:
```
@misc{YourName2025RSIPI,
author = {Your Name},
title = {RSIPI: Robot Sensor Interface Python Integration},
year = {2025},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/yourusername/RSIPI},
}
```
## 📚 License
RSIPI is released under the [MIT License](LICENSE).

23
setup.py Normal file
View File

@ -0,0 +1,23 @@
from setuptools import setup, find_packages
setup(
name="RSIPI",
version="0.1.0",
author="Your Name",
author_email="your.email@example.com",
description="Robot Sensor Interface Python Integration for KUKA Robots",
packages=find_packages(where="src"),
package_dir={"": "src"},
install_requires=[
"numpy",
"matplotlib",
"pandas",
# Other dependencies
],
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires='>=3.8',
)

View File

@ -20,7 +20,7 @@ class ConfigParser:
"BMode": "Status", "BMode": "Status",
"IPOSTAT": "", "IPOSTAT": "",
"Delay": ["D"], "Delay": ["D"],
"EStr": "EStr Test", "EStr": "RSIPI: Client started",
"Tech.C1": {"C11":0, "C12":0, "C13":0, "C14":0, "C15":0, "C16":0, "C17":0, "C18":0, "C19":0, "C110":0}, "Tech.C1": {"C11":0, "C12":0, "C13":0, "C14":0, "C15":0, "C16":0, "C17":0, "C18":0, "C19":0, "C110":0},
"Tech.C2": {"C21":0, "C22":0, "C23":0, "C24":0, "C25":0, "C26":0, "C27":0, "C28":0, "C29":0, "C210":0}, "Tech.C2": {"C21":0, "C22":0, "C23":0, "C24":0, "C25":0, "C26":0, "C27":0, "C28":0, "C29":0, "C210":0},
"Tech.C3": {"C31":0, "C32":0, "C33":0, "C34":0, "C35":0, "C36":0, "C37":0, "C38":0, "C39":0, "C310":0}, "Tech.C3": {"C31":0, "C32":0, "C33":0, "C34":0, "C35":0, "C36":0, "C37":0, "C38":0, "C39":0, "C310":0},
@ -107,17 +107,17 @@ class ConfigParser:
print(f"🔍 Assigning {tag}: INDX={indx}, TYPE={var_type}") print(f"🔍 Assigning {tag}: INDX={indx}, TYPE={var_type}")
if indx == "INTERNAL": if tag in self.internal_structure:
if tag in self.internal_structure: internal_value = self.internal_structure[tag]
internal_value = self.internal_structure[tag] if isinstance(internal_value, dict):
var_dict[tag] = internal_value.copy() if isinstance(internal_value, dict) else internal_value var_dict[tag] = internal_value.copy()
print(f"✅ INTERNAL Match: {tag} -> {var_dict[tag]}")
else: else:
print(f"❌ INTERNAL variable '{tag}' not found in internal dictionary.") var_dict[tag] = internal_value # ✅ Assign default internal value (e.g., "EStr Test")
elif "." in tag: # ✅ Handle elements in the format XXX.ZZ print(f"✅ INTERNAL Match: {tag} -> {var_dict[tag]}")
elif "." in tag:
parent, subkey = tag.split(".", 1) parent, subkey = tag.split(".", 1)
if parent not in var_dict: if parent not in var_dict:
var_dict[parent] = {} # ✅ Create parent dictionary if not exists var_dict[parent] = {}
var_dict[parent][subkey] = self.get_default_value(var_type) var_dict[parent][subkey] = self.get_default_value(var_type)
print(f"📂 Assigned '{tag}' as nested dictionary under '{parent}': {var_dict[parent]}") print(f"📂 Assigned '{tag}' as nested dictionary under '{parent}': {var_dict[parent]}")
else: else:

View File

@ -77,6 +77,7 @@ class EchoServer:
while self.running: while self.running:
try: try:
message = self.generate_message() message = self.generate_message()
print(f"Sending message:\n{message}")
logging.debug(f"📡 Sending message to RSI client with IPOC={self.ipoc_value}...") logging.debug(f"📡 Sending message to RSI client with IPOC={self.ipoc_value}...")
print(f"📡 Sending message to RSI client with IPOC={self.ipoc_value}...") print(f"📡 Sending message to RSI client with IPOC={self.ipoc_value}...")
self.udp_socket.sendto(message.encode(), self.client_address) # ✅ Send without waiting self.udp_socket.sendto(message.encode(), self.client_address) # ✅ Send without waiting

View File

@ -5,8 +5,9 @@ import time
import csv import csv
import logging import logging
import xml.etree.ElementTree as ET # ✅ FIX: Import ElementTree import xml.etree.ElementTree as ET # ✅ FIX: Import ElementTree
from config_parser import ConfigParser from .config_parser import ConfigParser
from xml_handler import XMLGenerator from .xml_handler import XMLGenerator
class NetworkProcess(multiprocessing.Process): class NetworkProcess(multiprocessing.Process):
"""Handles UDP communication and optional CSV logging in a separate process.""" """Handles UDP communication and optional CSV logging in a separate process."""
@ -58,10 +59,12 @@ class NetworkProcess(multiprocessing.Process):
try: try:
self.udp_socket.settimeout(5) self.udp_socket.settimeout(5)
data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024) data_received, self.controller_ip_and_port = self.udp_socket.recvfrom(1024)
print(data_received)
message = data_received.decode() message = data_received.decode()
self.process_received_data(message) self.process_received_data(message)
send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings) send_xml = XMLGenerator.generate_send_xml(self.send_variables, self.config_parser.network_settings)
print(send_xml)
self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port) self.udp_socket.sendto(send_xml.encode(), self.controller_ip_and_port)
# ✅ If logging is active, write data to CSV # ✅ If logging is active, write data to CSV
@ -74,7 +77,6 @@ class NetworkProcess(multiprocessing.Process):
print(f"[ERROR] Network process error: {e}") print(f"[ERROR] Network process error: {e}")
def process_received_data(self, xml_string): def process_received_data(self, xml_string):
"""Parse incoming XML and update shared variables."""
try: try:
root = ET.fromstring(xml_string) root = ET.fromstring(xml_string)
for element in root: for element in root:
@ -83,6 +85,12 @@ class NetworkProcess(multiprocessing.Process):
self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()} self.receive_variables[element.tag] = {k: float(v) for k, v in element.attrib.items()}
else: else:
self.receive_variables[element.tag] = element.text self.receive_variables[element.tag] = element.text
# specifically capture IPOC from received message
if element.tag == "IPOC":
received_ipoc = int(element.text)
self.receive_variables["IPOC"] = received_ipoc
self.send_variables["IPOC"] = received_ipoc + 4 # Increment by 4 ms
except Exception as e: except Exception as e:
print(f"[ERROR] Error parsing received message: {e}") print(f"[ERROR] Error parsing received message: {e}")

View File

@ -3,8 +3,8 @@ import pandas as pd
import numpy as np import numpy as np
import json import json
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from src.RSIPI.rsi_client import RSIClient from .rsi_client import RSIClient
from src.RSIPI.graphing import RSIGraphing from .graphing import RSIGraphing
class RSIAPI: class RSIAPI:
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""

View File

@ -1,8 +1,8 @@
import logging import logging
import multiprocessing import multiprocessing
import time import time
from src.RSIPI.config_parser import ConfigParser from .config_parser import ConfigParser
from src.RSIPI.network_process import NetworkProcess from .network_process import NetworkProcess
class RSIClient: class RSIClient:
"""Main RSI API class that integrates network, config handling, and message processing.""" """Main RSI API class that integrates network, config handling, and message processing."""

93
src/RSIPI/test_rsipi.py Normal file
View File

@ -0,0 +1,93 @@
import unittest
from time import sleep
from rsi_api import RSIAPI
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
class TestRSIPI(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.api = RSIAPI("RSI_EthernetConfig.xml")
cls.api.start_rsi()
sleep(2)
@classmethod
def tearDownClass(cls):
cls.api.stop_rsi()
def test_update_variable(self):
response = self.api.update_variable("EStr", "TestMessage")
self.assertIn("✅ Updated EStr to TestMessage", response)
def test_toggle_digital_io(self):
response = self.api.toggle_digital_io("DiO", 1)
self.assertIn("✅ DiO set to 1", response)
def test_move_external_axis(self):
response = self.api.move_external_axis("E1", 150.0)
self.assertIn("✅ Moved E1 to 150.0", response)
def test_correct_position_rkorr(self):
response = self.api.correct_position("RKorr", "X", 10.5)
self.assertIn("✅ Applied correction: RKorr.X = 10.5", response)
def test_correct_position_akorr(self):
response = self.api.correct_position("AKorr", "A1", 5.0)
self.assertIn("✅ Applied correction: AKorr.A1 = 5.0", response)
def test_adjust_speed(self):
response = self.api.adjust_speed("Tech.T21", 2.5)
self.assertIn("✅ Set Tech.T21 to 2.5", response)
def test_logging_start_and_stop(self):
response_start = self.api.start_logging("test_log.csv")
self.assertIn("✅ CSV Logging started", response_start)
sleep(2)
response_stop = self.api.stop_logging()
self.assertIn("🛑 CSV Logging stopped", response_stop)
def test_graphing_start_and_stop(self):
response_start = self.api.start_graphing(mode="position")
self.assertIn("✅ Graphing started in position mode", response_start)
sleep(5)
response_stop = self.api.stop_graphing()
self.assertIn("🛑 Graphing stopped", response_stop)
def test_get_live_data(self):
data = self.api.get_live_data()
self.assertIn("position", data)
self.assertIn("force", data)
def test_get_ipoc(self):
ipoc = self.api.get_ipoc()
self.assertTrue(isinstance(ipoc, int) or ipoc.isdigit())
def test_reconnect(self):
response = self.api.reconnect()
self.assertIn("✅ Network connection restarted", response)
def test_reset_variables(self):
response = self.api.reset_variables()
self.assertIn("✅ Send variables reset to default values", response)
def test_get_status(self):
status = self.api.get_status()
self.assertIn("network", status)
self.assertIn("send_variables", status)
self.assertIn("receive_variables", status)
def test_export_data(self):
response = self.api.export_movement_data("export_test.csv")
self.assertIn("✅ Data exported to export_test.csv", response)
def test_alert_toggle_and_threshold(self):
response_enable = self.api.enable_alerts(True)
self.assertIn("✅ Alerts enabled", response_enable)
response_threshold = self.api.set_alert_threshold("deviation", 3.5)
self.assertIn("✅ Deviation alert threshold set to 3.5", response_threshold)
if __name__ == '__main__':
unittest.main()

View File

@ -9,6 +9,9 @@ class XMLGenerator:
root = ET.Element("Sen", Type=network_settings["sentype"]) # ✅ Root with Type from config root = ET.Element("Sen", Type=network_settings["sentype"]) # ✅ Root with Type from config
for key, value in send_variables.items(): for key, value in send_variables.items():
if key == "FREE":
continue # explicitly skip FREE
if isinstance(value, dict): # ✅ Handle dictionaries as elements with attributes if isinstance(value, dict): # ✅ Handle dictionaries as elements with attributes
element = ET.SubElement(root, key) element = ET.SubElement(root, key)
for sub_key, sub_value in value.items(): for sub_key, sub_value in value.items():