From 68baf11c938f0bebbf88d925134a29861a3ca8a9 Mon Sep 17 00:00:00 2001 From: Adam Date: Fri, 25 Apr 2025 21:31:29 +0100 Subject: [PATCH] Now all API is done. --- src/RSIPI/krl_to_csv_parser.py | 115 +++++++++++++++++---------------- src/RSIPI/rsi_api.py | 96 +++++++++++++-------------- src/RSIPI/rsi_cli.py | 4 +- 3 files changed, 108 insertions(+), 107 deletions(-) diff --git a/src/RSIPI/krl_to_csv_parser.py b/src/RSIPI/krl_to_csv_parser.py index a7a8022..592257a 100644 --- a/src/RSIPI/krl_to_csv_parser.py +++ b/src/RSIPI/krl_to_csv_parser.py @@ -2,96 +2,97 @@ import csv import re from collections import OrderedDict - -# noinspection PyTypeChecker class KRLParser: """ - Parses KUKA KRL .src and .dat files to extract Cartesian coordinates + Parses KUKA KRL .src and .dat files to extract TCP setpoints and exports them into a structured CSV format. """ def __init__(self, src_file, dat_file): - """ - Initialise the parser with the paths to the .src and .dat files. - - Args: - src_file (str): Path to the KRL .src file. - dat_file (str): Path to the KRL .dat file. - """ self.src_file = src_file self.dat_file = dat_file - self.positions = OrderedDict() # Maintain order of appearance from .src + self.positions = OrderedDict() # Maintain order of appearance + self.labels_to_extract = [] # Store labels found in .src (e.g., XP310, XP311) def parse_src(self): """ - Parses the .src file to extract all position references (labels like P1, P2, etc.). - These are later used to match against coordinate data from the .dat file. + Parses the .src file to extract motion commands and their labels (e.g., PTP XP310). """ - pattern = re.compile(r"PDAT_ACT=([A-Z]+\d+)", re.IGNORECASE) - with open(self.src_file, 'r') as file: - for line in file: - match = pattern.search(line) - if match: - pos_ref = match.group(1).upper() - self.positions[pos_ref] = {} # Placeholder for coordinates + move_pattern = re.compile(r"\bPTP\s+(\w+)", re.IGNORECASE) - print("📌 Extracted labels from .src:", self.positions.keys()) + with open(self.src_file, 'r', encoding='utf-8') as file: + for line in file: + match = move_pattern.search(line) + if match: + label = match.group(1).strip().upper() + if label not in self.labels_to_extract: + self.labels_to_extract.append(label) + + print(f"📌 Found labels in .src: {self.labels_to_extract}") def parse_dat(self): """ - Parses the .dat file and retrieves Cartesian coordinate data for each known position reference. - Matches only those positions previously found in the .src file. + Parses the .dat file and retrieves Cartesian coordinates for each label. """ - pos_pattern = re.compile( - r"DECL E6POS ([A-Z]+\d+)=\{X ([^,]+),Y ([^,]+),Z ([^,]+),A ([^,]+),B ([^,]+),C ([^,]+),S ([^,]+),T ([^,]+)", - re.IGNORECASE - ) + pos_pattern = re.compile(r"DECL\s+E6POS\s+(\w+)\s*=\s*\{([^}]*)\}", re.IGNORECASE) - with open(self.dat_file, 'r') as file: + with open(self.dat_file, 'r', encoding='utf-8') as file: for line in file: match = pos_pattern.search(line) if match: - pos_name = match.group(1).upper() - coords = { - 'X': float(match.group(2)), - 'Y': float(match.group(3)), - 'Z': float(match.group(4)), - 'A': float(match.group(5)), - 'B': float(match.group(6)), - 'C': float(match.group(7)), - 'S': int(match.group(8)), - 'T': int(match.group(9)), - } - # Only store coordinates if the label was found in the .src - if pos_name in self.positions: - self.positions[pos_name] = coords + label = match.group(1).strip().upper() + coords_text = match.group(2) - print("📄 Current line:", line.strip()) + coords = {} + for entry in coords_text.split(','): + key_value = entry.strip().split() + if len(key_value) == 2: + key, value = key_value + try: + if key in ["S", "T"]: + coords[key] = int(float(value)) + else: + coords[key] = float(value) + except ValueError: + coords[key] = 0 # fallback - print("📌 Looking for positions in .dat:", self.positions.keys()) + self.positions[label] = coords + + print(f"📥 Parsed {len(self.positions)} positions from .dat") def export_csv(self, output_file): """ - Writes the extracted position data into a CSV file. - - Args: - output_file (str): Path to the output CSV file. + Writes the extracted Cartesian positions into a structured CSV file, + skipping any deleted/missing points. """ fieldnames = ["Sequence", "PosRef", "X", "Y", "Z", "A", "B", "C", "S", "T"] - with open(output_file, 'w', newline='') as csv_file: + with open(output_file, 'w', newline='', encoding='utf-8') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=fieldnames) writer.writeheader() - for idx, (pos_ref, coords) in enumerate(self.positions.items()): - if coords: # Skip empty entries (e.g. unmatched labels) - writer.writerow({ - "Sequence": idx, - "PosRef": pos_ref, - **coords - }) + sequence_number = 0 # Only count real points - print("📥 Final positions extracted:", self.positions) + for label in self.labels_to_extract: + coords = self.positions.get(label) + if coords: + writer.writerow({ + "Sequence": sequence_number, + "PosRef": label, + "X": coords.get("X", 0), + "Y": coords.get("Y", 0), + "Z": coords.get("Z", 0), + "A": coords.get("A", 0), + "B": coords.get("B", 0), + "C": coords.get("C", 0), + "S": coords.get("S", 0), + "T": coords.get("T", 0), + }) + sequence_number += 1 + else: + print(f"⚠️ Skipped missing/deleted point: {label}") + + print(f"✅ CSV exported successfully to {output_file} with {sequence_number} points.") # Optional CLI usage diff --git a/src/RSIPI/rsi_api.py b/src/RSIPI/rsi_api.py index 01e2618..9d4f02a 100644 --- a/src/RSIPI/rsi_api.py +++ b/src/RSIPI/rsi_api.py @@ -15,6 +15,54 @@ from src.RSIPI.live_plotter import LivePlotter from threading import Thread import asyncio + +def generate_report(filename, format_type): + """ + Generate a statistical report from a CSV log file. + + Args: + filename (str): Path to the CSV file (or base name without .csv). + format_type (str): 'csv', 'json', or 'pdf' + """ + # Ensure filename ends with .csv + if not filename.endswith(".csv"): + filename += ".csv" + + if not os.path.exists(filename): + raise FileNotFoundError(f"❌ File not found: {filename}") + + df = pd.read_csv(filename) + + # Only keep relevant columns (e.g. actual positions) + position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")] + if not position_cols: + raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.") + + report_data = { + "Max Position": df[position_cols].max().to_dict(), + "Mean Position": df[position_cols].mean().to_dict(), + } + + report_base = filename.replace(".csv", "") + output_path = f"{report_base}_report.{format_type.lower()}" + + if format_type == "csv": + pd.DataFrame(report_data).T.to_csv(output_path) + elif format_type == "json": + with open(output_path, "w") as f: + json.dump(report_data, f, indent=4) + elif format_type == "pdf": + fig, ax = plt.subplots() + pd.DataFrame(report_data).T.plot(kind='bar', ax=ax) + ax.set_title("RSI Position Report") + plt.tight_layout() + plt.savefig(output_path) + else: + raise ValueError(f"❌ Unsupported format: {format_type}") + + return f"✅ Report saved as {output_path}" + + class RSIAPI: """RSI API for programmatic control, including alerts, logging, graphing, and data retrieval.""" @@ -282,52 +330,6 @@ class RSIAPI: return f"✅ {alert_type.capitalize()} alert threshold set to {value}" return "❌ Invalid alert type. Use 'deviation' or 'force'." - def generate_report(self, filename, format_type): - """ - Generate a statistical report from a CSV log file. - - Args: - filename (str): Path to the CSV file (or base name without .csv). - format_type (str): 'csv', 'json', or 'pdf' - """ - # Ensure filename ends with .csv - if not filename.endswith(".csv"): - filename += ".csv" - - if not os.path.exists(filename): - raise FileNotFoundError(f"❌ File not found: {filename}") - - df = pd.read_csv(filename) - - # Only keep relevant columns (e.g. actual positions) - position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")] - if not position_cols: - raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.") - - report_data = { - "Max Position": df[position_cols].max().to_dict(), - "Mean Position": df[position_cols].mean().to_dict(), - } - - report_base = filename.replace(".csv", "") - output_path = f"{report_base}_report.{format_type.lower()}" - - if format_type == "csv": - pd.DataFrame(report_data).T.to_csv(output_path) - elif format_type == "json": - with open(output_path, "w") as f: - json.dump(report_data, f, indent=4) - elif format_type == "pdf": - fig, ax = plt.subplots() - pd.DataFrame(report_data).T.plot(kind='bar', ax=ax) - ax.set_title("RSI Position Report") - plt.tight_layout() - plt.savefig(output_path) - else: - raise ValueError(f"❌ Unsupported format: {format_type}") - - return f"✅ Report saved as {output_path}" - @staticmethod def visualise_csv_log(csv_file, export=False): """ @@ -345,7 +347,6 @@ class RSIAPI: if export: visualizer.export_graphs() - ## TODO Need to test parsing krl to csv @staticmethod def parse_krl_to_csv(src_file, dat_file, output_file): """ @@ -365,7 +366,6 @@ class RSIAPI: except Exception as e: return f"❌ Error parsing KRL files: {e}" - ## TODO Need to test injecting RSI code. @staticmethod def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"): """ diff --git a/src/RSIPI/rsi_cli.py b/src/RSIPI/rsi_cli.py index 371a800..f35e6a5 100644 --- a/src/RSIPI/rsi_cli.py +++ b/src/RSIPI/rsi_cli.py @@ -199,7 +199,7 @@ class RSICommandLineInterface: print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}") elif cmd == "generate_report" and len(parts) in [2, 3]: output = parts[2] if len(parts) == 3 else "report.txt" - result = self.client.generate_report(parts[1], output) + result = generate_report(parts[1], output) print(result) elif cmd == "safety-stop": self.client.safety_manager.emergency_stop() @@ -279,7 +279,7 @@ class RSICommandLineInterface: if format_type not in ["csv", "json", "pdf"]: print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.") return - self.client.generate_report(filename, format_type) + generate_report(filename, format_type) print(f"✅ Report generated: {filename}.{format_type}")