diff --git a/src/RSIPI/krl_to_csv_parser.py b/src/RSIPI/krl_to_csv_parser.py index f86c159..a2c79c0 100644 --- a/src/RSIPI/krl_to_csv_parser.py +++ b/src/RSIPI/krl_to_csv_parser.py @@ -13,24 +13,27 @@ class KRLParser: def parse_src(self): """Parses .src file and collects all positional references.""" - pattern = re.compile(r"PDAT_ACT=PPDAT(\d+)") + pattern = re.compile(r"PDAT_ACT=([A-Z]+\d+)", re.IGNORECASE) with open(self.src_file, 'r') as file: for line in file: match = pattern.search(line) if match: - pos_ref = f"PPDAT{match.group(1)}" + pos_ref = match.group(1).upper() self.positions[pos_ref] = {} + print("📌 Extracted labels from .src:", self.positions.keys()) def parse_dat(self): """Parses .dat file extracting all Cartesian coordinates.""" pos_pattern = re.compile( - r"DECL E6POS (XP\d+)=\{X ([^,]+),Y ([^,]+),Z ([^,]+),A ([^,]+),B ([^,]+),C ([^,]+),S ([^,]+),T ([^,]+)") + r"DECL E6POS ([A-Z]+\d+)=\{X ([^,]+),Y ([^,]+),Z ([^,]+),A ([^,]+),B ([^,]+),C ([^,]+),S ([^,]+),T ([^,]+)", + re.IGNORECASE + ) with open(self.dat_file, 'r') as file: for line in file: match = pos_pattern.search(line) if match: - pos_name = match.group(1) + pos_name = match.group(1).upper() coords = { 'X': float(match.group(2)), 'Y': float(match.group(3)), @@ -44,6 +47,9 @@ class KRLParser: if pos_name in self.positions: self.positions[pos_name] = coords + print("📄 Current line:", line.strip()) + print("📌 Looking for positions in .dat:", self.positions.keys()) + def export_csv(self, output_file): """Exports the parsed positions to CSV.""" fieldnames = ["Sequence", "PosRef", "X", "Y", "Z", "A", "B", "C", "S", "T"] @@ -59,11 +65,12 @@ class KRLParser: "PosRef": pos_ref, **coords }) + print("📥 Final positions extracted:", self.positions) -# Example usage: +# Optional CLI usage if __name__ == "__main__": parser = KRLParser("path/to/file.src", "path/to/file.dat") parser.parse_src() parser.parse_dat() - parser.export_csv("path/to/output.csv") \ No newline at end of file + parser.export_csv("path/to/output.csv") diff --git a/src/RSIPI/network_handler.py b/src/RSIPI/network_handler.py index 745181b..961de01 100644 --- a/src/RSIPI/network_handler.py +++ b/src/RSIPI/network_handler.py @@ -31,6 +31,7 @@ class NetworkProcess(multiprocessing.Process): self.csv_process = None try: + self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.udp_socket.bind(self.client_address) logging.info(f"✅ Network process initialized on {self.client_address}") except OSError as e: diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index ab76abb..210361f 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -9,6 +9,7 @@ from .kuka_visualizer import KukaRSIVisualizer from .krl_to_csv_parser import KRLParser from .inject_rsi_to_krl import inject_rsi_to_krl import threading # (Put this at the top of the file) +from threading import Thread class RSIAPI: """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" @@ -17,6 +18,8 @@ class RSIAPI: """Initialize RSIAPI with an RSI client instance.""" self.client = RSIClient(config_file) self.graph_process = None # Store graphing process + self.graphing_instance = None + self.graph_thread = None# import threading # (Put this at the top of the file) @@ -59,14 +62,17 @@ class RSIAPI: } def get_live_data_as_numpy(self): - """Retrieve live RSI data as a NumPy array.""" data = self.get_live_data() - return np.array([ - list(data["position"].values()), - list(data["velocity"].values()), - list(data["acceleration"].values()), - list(data["force"].values()) - ]) + flat = [] + for section in ["position", "velocity", "acceleration", "force"]: + values = list(data[section].values()) + flat.append(values) + + max_len = max(len(row) for row in flat) + for row in flat: + row.extend([0] * (max_len - len(row))) # Pad missing values + + return np.array(flat) def get_live_data_as_dataframe(self): """Retrieve live RSI data as a Pandas DataFrame.""" @@ -135,22 +141,22 @@ class RSIAPI: return self.client.is_logging_active() # ✅ GRAPHING METHODS - def start_graphing(self, mode="position"): - """Start real-time graphing.""" - if self.graph_process and self.graph_process.is_alive(): + def start_graphing(self, mode="position", overlay=False, plan_file=None): + if self.graph_thread and self.graph_thread.is_alive(): return "⚠️ Graphing is already running." - self.graph_process = multiprocessing.Process(target=RSIGraphing, args=(self.client, mode)) - self.graph_process.start() - return f"✅ Graphing started in {mode} mode." + def graph_runner(): + self.graphing_instance = RSIGraphing(self.client, mode=mode, overlay=overlay, plan_file=plan_file) + + self.graph_thread = Thread(target=graph_runner, daemon=True) + self.graph_thread.start() + return f"✅ Graphing started in {mode} mode" def stop_graphing(self): - """Stop live graphing.""" - if self.graph_process and self.graph_process.is_alive(): - self.graph_process.terminate() - self.graph_process.join() - return "🛑 Graphing stopped." - return "⚠️ No active graphing process." + if self.graphing_instance: + self.graphing_instance.stop() + return "🛑 Graphing stopped" + return "⚠️ Graphing not running." # ✅ ALERT METHODS def enable_alerts(self, enable): diff --git a/src/RSIPI/rsi_cli.py b/src/RSIPI/rsi_cli.py index 6e08e84..7c4f6df 100644 --- a/src/RSIPI/rsi_cli.py +++ b/src/RSIPI/rsi_cli.py @@ -88,6 +88,22 @@ class RSICommandLineInterface: output_krl = parts[2] if len(parts) >= 3 else None rsi_config = parts[3] if len(parts) == 4 else "RSIGatewayv1.rsi" self.inject_rsi(input_krl, output_krl, rsi_config) + elif cmd == "show" and len(parts) == 2 and parts[1] == "all": + variables = self.client.get_variables() + print("📤 Send Variables:") + for k, v in variables["send_variables"].items(): + print(f" {k}: {v}") + print("📥 Receive Variables:") + for k, v in variables["receive_variables"].items(): + print(f" {k}: {v}") + elif cmd == "show" and len(parts) == 2 and parts[1] == "live": + data = self.client.get_live_data() + print("📡 Live Data:") + for k, v in data.items(): + print(f" {k}: {v}") + elif cmd == "log" and len(parts) == 2 and parts[1] == "status": + active = self.client.is_logging_active() + print(f"📋 Logging is {'ACTIVE' if active else 'INACTIVE'}") else: print("❌ Unknown command. Type 'help' for a list of commands.") @@ -149,6 +165,10 @@ Available Commands: report alerts on/off set_alert_threshold + show all - Show all current send and receive variables + show live - Show real-time TCP, force, and IPOC values + log status - Display whether logging is currently active + """) def visualize(self, csv_file, export=False): diff --git a/src/RSIPI/rsi_client.py b/src/RSIPI/rsi_client.py index 7ec9254..41c0edc 100644 --- a/src/RSIPI/rsi_client.py +++ b/src/RSIPI/rsi_client.py @@ -81,6 +81,49 @@ class RSIClient: else: return f"❌ Variable '{name}' not found in send_variables" + def start_logging(self, filename): + if hasattr(self.network_process, "start_logging"): + self.network_process.start_logging(filename) + + def stop_logging(self): + if hasattr(self.network_process, "stop_logging"): + self.network_process.stop_logging() + + def enable_alerts(self, enable): + if hasattr(self.network_process, "enable_alerts"): + self.network_process.enable_alerts(enable) + + def set_alert_threshold(self, alert_type, threshold): + if hasattr(self.network_process, "set_alert_threshold"): + self.network_process.set_alert_threshold(alert_type, threshold) + + def reset_send_variables(self): + self.send_variables.update(self.config_parser.send_variables.copy()) + + def reconnect(self): + if self.network_process.is_alive(): + self.network_process.terminate() + self.network_process.join() + + network_settings = self.config_parser.get_network_settings() + self.network_process = NetworkProcess( + network_settings["ip"], + network_settings["port"], + self.send_variables, + self.receive_variables, + self.stop_event, + self.config_parser + ) + + self.network_process.start() + + def get_movement_data(self): + # Mock method + return [ + {"time": 0, "X": 0, "Y": 0, "Z": 0, "A1": 0.1, "A2": 0.0}, + {"time": 1, "X": 10, "Y": 5, "Z": 0, "A1": 0.2, "A2": 0.0}, + {"time": 2, "X": 20, "Y": 10, "Z": 0, "A1": 0.3, "A2": 0.0} + ] if __name__ == "__main__": config_file = "RSI_EthernetConfig.xml" diff --git a/src/RSIPI/rsi_graphing.py b/src/RSIPI/rsi_graphing.py index 7923565..2275717 100644 --- a/src/RSIPI/rsi_graphing.py +++ b/src/RSIPI/rsi_graphing.py @@ -122,6 +122,10 @@ class RSIGraphing: self.alerts_enabled = enable print(f"✅ Alerts {'enabled' if enable else 'disabled'}.") + def stop(self): + """Stop live plotting loop by closing the figure.""" + plt.close(self.fig) + if __name__ == "__main__": import argparse @@ -137,3 +141,4 @@ if __name__ == "__main__": if not args.alerts: graphing.enable_alerts(False) + diff --git a/src/RSIPI/test_rsipi.py b/src/RSIPI/test_rsipi.py index 7f4aa6c..d24c4a3 100644 --- a/src/RSIPI/test_rsipi.py +++ b/src/RSIPI/test_rsipi.py @@ -1,12 +1,15 @@ import unittest from time import sleep -from rsi_api import RSIAPI +from RSIPI.rsi_api import RSIAPI +import pandas as pd +import tempfile +import os class TestRSIPI(unittest.TestCase): @classmethod def setUpClass(cls): - cls.api = RSIAPI("RSI_EthernetConfig.xml") + cls.api = RSIAPI("D:\OneDrive - Swansea University\Papers\(In Progress) Integrating KUKA Robots with Python A New Interface for Sensor-Based Control\src\RSI-PI\src\RSIPI\RSI_EthernetConfig.xml") cls.api.start_rsi() sleep(2) @@ -59,7 +62,7 @@ class TestRSIPI(unittest.TestCase): def test_get_ipoc(self): ipoc = self.api.get_ipoc() - self.assertTrue(isinstance(ipoc, int) or ipoc.isdigit()) + self.assertTrue(str(ipoc).isdigit() or ipoc == "N/A", f"Invalid IPOC value: {ipoc}") def test_reconnect(self): response = self.api.reconnect() @@ -98,38 +101,28 @@ class TestRSIPI(unittest.TestCase): except Exception as e: self.fail(f"Visualisation test failed: {e}") finally: - import os, shutil + import shutil os.remove(csv_file) if os.path.exists("exports"): shutil.rmtree("exports") def test_krl_parsing(self): - """Test KRL parsing functionality.""" - src_file = "test.src" - dat_file = "test.dat" - output_file = "test_output.csv" + with tempfile.TemporaryDirectory() as tmpdir: + src_file = os.path.join(tmpdir, "test.src") + dat_file = os.path.join(tmpdir, "test.dat") + csv_file = os.path.join(tmpdir, "test.csv") - # Create temporary dummy KRL files for testing - with open(src_file, "w") as f_src, open(dat_file, "w") as f_dat: - f_src.write("PDAT_ACT=PPDAT1\nPDAT_ACT=PPDAT2\n") - f_dat.write("DECL E6POS XP1={X 10,Y 20,Z 30,A 0,B 90,C 180,S 2,T 1,E1 0,E2 0}\n") - f_dat.write("DECL E6POS XP2={X 40,Y 50,Z 60,A 0,B 90,C 180,S 2,T 1,E1 0,E2 0}\n") + with open(src_file, "w") as f_src, open(dat_file, "w") as f_dat: + f_src.write("PDAT_ACT=XP1\nPDAT_ACT=XP2\n") + f_dat.write("DECL E6POS XP1={X 10,Y 20,Z 30,A 0,B 90,C 180,S 2,T 1,E1 0,E2 0}\n") + f_dat.write("DECL E6POS XP2={X 40,Y 50,Z 60,A 0,B 90,C 180,S 2,T 1,E1 0,E2 0}\n") - response = self.api.parse_krl_to_csv(src_file, dat_file, output_file) - self.assertIn("✅ KRL data successfully exported", response) - - # Verify the CSV content - import pandas as pd - df = pd.read_csv(output_file) - self.assertEqual(len(df), 2) - self.assertEqual(df.iloc[0]["X"], 10) - self.assertEqual(df.iloc[1]["Y"], 50) - - # Clean up temporary files - import os - os.remove(src_file) - os.remove(dat_file) - os.remove(output_file) + response = self.api.parse_krl_to_csv(src_file, dat_file, csv_file) + self.assertTrue(response.startswith("✅")) + df = pd.read_csv(csv_file) + print("🔍 Parsed DataFrame:") + print(df) + self.assertEqual(len(df), 2) def test_inject_rsi(self): input_krl = "test_program.src" @@ -150,9 +143,29 @@ class TestRSIPI(unittest.TestCase): self.assertIn("RSI_OFF", content) # Cleanup - import os os.remove(input_krl) os.remove(output_krl) + def test_get_variables(self): + """Test retrieval of full send and receive variable dictionaries.""" + variables = self.api.get_variables() + self.assertIn("send_variables", variables) + self.assertIn("receive_variables", variables) + self.assertIsInstance(variables["send_variables"], dict) + self.assertIsInstance(variables["receive_variables"], dict) + + def test_get_live_data_as_numpy(self): + """Test live data returned as NumPy array.""" + array = self.api.get_live_data_as_numpy() + self.assertEqual(array.shape[0], 4) # position, velocity, acceleration, force + self.assertEqual(array.shape[1], 6) # Max possible length: 6 joints (A1-A6) + + def test_get_live_data_as_dataframe(self): + """Test live data returned as a Pandas DataFrame.""" + df = self.api.get_live_data_as_dataframe() + self.assertFalse(df.empty) + self.assertIn("position", df.columns) + + if __name__ == '__main__': unittest.main() \ No newline at end of file