Now all API is done.

This commit is contained in:
Adam 2025-04-25 21:31:29 +01:00
parent d40ba36c74
commit 68baf11c93
3 changed files with 108 additions and 107 deletions

View File

@ -2,96 +2,97 @@ import csv
import re
from collections import OrderedDict
# noinspection PyTypeChecker
class KRLParser:
"""
Parses KUKA KRL .src and .dat files to extract Cartesian coordinates
Parses KUKA KRL .src and .dat files to extract TCP setpoints
and exports them into a structured CSV format.
"""
def __init__(self, src_file, dat_file):
"""
Initialise the parser with the paths to the .src and .dat files.
Args:
src_file (str): Path to the KRL .src file.
dat_file (str): Path to the KRL .dat file.
"""
self.src_file = src_file
self.dat_file = dat_file
self.positions = OrderedDict() # Maintain order of appearance from .src
self.positions = OrderedDict() # Maintain order of appearance
self.labels_to_extract = [] # Store labels found in .src (e.g., XP310, XP311)
def parse_src(self):
"""
Parses the .src file to extract all position references (labels like P1, P2, etc.).
These are later used to match against coordinate data from the .dat file.
Parses the .src file to extract motion commands and their labels (e.g., PTP XP310).
"""
pattern = re.compile(r"PDAT_ACT=([A-Z]+\d+)", re.IGNORECASE)
with open(self.src_file, 'r') as file:
for line in file:
match = pattern.search(line)
if match:
pos_ref = match.group(1).upper()
self.positions[pos_ref] = {} # Placeholder for coordinates
move_pattern = re.compile(r"\bPTP\s+(\w+)", re.IGNORECASE)
print("📌 Extracted labels from .src:", self.positions.keys())
with open(self.src_file, 'r', encoding='utf-8') as file:
for line in file:
match = move_pattern.search(line)
if match:
label = match.group(1).strip().upper()
if label not in self.labels_to_extract:
self.labels_to_extract.append(label)
print(f"📌 Found labels in .src: {self.labels_to_extract}")
def parse_dat(self):
"""
Parses the .dat file and retrieves Cartesian coordinate data for each known position reference.
Matches only those positions previously found in the .src file.
Parses the .dat file and retrieves Cartesian coordinates for each label.
"""
pos_pattern = re.compile(
r"DECL E6POS ([A-Z]+\d+)=\{X ([^,]+),Y ([^,]+),Z ([^,]+),A ([^,]+),B ([^,]+),C ([^,]+),S ([^,]+),T ([^,]+)",
re.IGNORECASE
)
pos_pattern = re.compile(r"DECL\s+E6POS\s+(\w+)\s*=\s*\{([^}]*)\}", re.IGNORECASE)
with open(self.dat_file, 'r') as file:
with open(self.dat_file, 'r', encoding='utf-8') as file:
for line in file:
match = pos_pattern.search(line)
if match:
pos_name = match.group(1).upper()
coords = {
'X': float(match.group(2)),
'Y': float(match.group(3)),
'Z': float(match.group(4)),
'A': float(match.group(5)),
'B': float(match.group(6)),
'C': float(match.group(7)),
'S': int(match.group(8)),
'T': int(match.group(9)),
}
# Only store coordinates if the label was found in the .src
if pos_name in self.positions:
self.positions[pos_name] = coords
label = match.group(1).strip().upper()
coords_text = match.group(2)
print("📄 Current line:", line.strip())
coords = {}
for entry in coords_text.split(','):
key_value = entry.strip().split()
if len(key_value) == 2:
key, value = key_value
try:
if key in ["S", "T"]:
coords[key] = int(float(value))
else:
coords[key] = float(value)
except ValueError:
coords[key] = 0 # fallback
print("📌 Looking for positions in .dat:", self.positions.keys())
self.positions[label] = coords
print(f"📥 Parsed {len(self.positions)} positions from .dat")
def export_csv(self, output_file):
"""
Writes the extracted position data into a CSV file.
Args:
output_file (str): Path to the output CSV file.
Writes the extracted Cartesian positions into a structured CSV file,
skipping any deleted/missing points.
"""
fieldnames = ["Sequence", "PosRef", "X", "Y", "Z", "A", "B", "C", "S", "T"]
with open(output_file, 'w', newline='') as csv_file:
with open(output_file, 'w', newline='', encoding='utf-8') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for idx, (pos_ref, coords) in enumerate(self.positions.items()):
if coords: # Skip empty entries (e.g. unmatched labels)
writer.writerow({
"Sequence": idx,
"PosRef": pos_ref,
**coords
})
sequence_number = 0 # Only count real points
print("📥 Final positions extracted:", self.positions)
for label in self.labels_to_extract:
coords = self.positions.get(label)
if coords:
writer.writerow({
"Sequence": sequence_number,
"PosRef": label,
"X": coords.get("X", 0),
"Y": coords.get("Y", 0),
"Z": coords.get("Z", 0),
"A": coords.get("A", 0),
"B": coords.get("B", 0),
"C": coords.get("C", 0),
"S": coords.get("S", 0),
"T": coords.get("T", 0),
})
sequence_number += 1
else:
print(f"⚠️ Skipped missing/deleted point: {label}")
print(f"✅ CSV exported successfully to {output_file} with {sequence_number} points.")
# Optional CLI usage

View File

@ -15,6 +15,54 @@ from src.RSIPI.live_plotter import LivePlotter
from threading import Thread
import asyncio
def generate_report(filename, format_type):
"""
Generate a statistical report from a CSV log file.
Args:
filename (str): Path to the CSV file (or base name without .csv).
format_type (str): 'csv', 'json', or 'pdf'
"""
# Ensure filename ends with .csv
if not filename.endswith(".csv"):
filename += ".csv"
if not os.path.exists(filename):
raise FileNotFoundError(f"❌ File not found: {filename}")
df = pd.read_csv(filename)
# Only keep relevant columns (e.g. actual positions)
position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")]
if not position_cols:
raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.")
report_data = {
"Max Position": df[position_cols].max().to_dict(),
"Mean Position": df[position_cols].mean().to_dict(),
}
report_base = filename.replace(".csv", "")
output_path = f"{report_base}_report.{format_type.lower()}"
if format_type == "csv":
pd.DataFrame(report_data).T.to_csv(output_path)
elif format_type == "json":
with open(output_path, "w") as f:
json.dump(report_data, f, indent=4)
elif format_type == "pdf":
fig, ax = plt.subplots()
pd.DataFrame(report_data).T.plot(kind='bar', ax=ax)
ax.set_title("RSI Position Report")
plt.tight_layout()
plt.savefig(output_path)
else:
raise ValueError(f"❌ Unsupported format: {format_type}")
return f"✅ Report saved as {output_path}"
class RSIAPI:
"""RSI API for programmatic control, including alerts, logging, graphing, and data retrieval."""
@ -282,52 +330,6 @@ class RSIAPI:
return f"{alert_type.capitalize()} alert threshold set to {value}"
return "❌ Invalid alert type. Use 'deviation' or 'force'."
def generate_report(self, filename, format_type):
"""
Generate a statistical report from a CSV log file.
Args:
filename (str): Path to the CSV file (or base name without .csv).
format_type (str): 'csv', 'json', or 'pdf'
"""
# Ensure filename ends with .csv
if not filename.endswith(".csv"):
filename += ".csv"
if not os.path.exists(filename):
raise FileNotFoundError(f"❌ File not found: {filename}")
df = pd.read_csv(filename)
# Only keep relevant columns (e.g. actual positions)
position_cols = [col for col in df.columns if col.startswith("Receive.RIst.")]
if not position_cols:
raise ValueError("❌ No 'Receive.RIst' position columns found in CSV.")
report_data = {
"Max Position": df[position_cols].max().to_dict(),
"Mean Position": df[position_cols].mean().to_dict(),
}
report_base = filename.replace(".csv", "")
output_path = f"{report_base}_report.{format_type.lower()}"
if format_type == "csv":
pd.DataFrame(report_data).T.to_csv(output_path)
elif format_type == "json":
with open(output_path, "w") as f:
json.dump(report_data, f, indent=4)
elif format_type == "pdf":
fig, ax = plt.subplots()
pd.DataFrame(report_data).T.plot(kind='bar', ax=ax)
ax.set_title("RSI Position Report")
plt.tight_layout()
plt.savefig(output_path)
else:
raise ValueError(f"❌ Unsupported format: {format_type}")
return f"✅ Report saved as {output_path}"
@staticmethod
def visualise_csv_log(csv_file, export=False):
"""
@ -345,7 +347,6 @@ class RSIAPI:
if export:
visualizer.export_graphs()
## TODO Need to test parsing krl to csv
@staticmethod
def parse_krl_to_csv(src_file, dat_file, output_file):
"""
@ -365,7 +366,6 @@ class RSIAPI:
except Exception as e:
return f"❌ Error parsing KRL files: {e}"
## TODO Need to test injecting RSI code.
@staticmethod
def inject_rsi(input_krl, output_krl=None, rsi_config="RSIGatewayv1.rsi"):
"""

View File

@ -199,7 +199,7 @@ class RSICommandLineInterface:
print(f"{key}: mean_diff={stats['mean_diff']:.3f}, max_diff={stats['max_diff']:.3f}")
elif cmd == "generate_report" and len(parts) in [2, 3]:
output = parts[2] if len(parts) == 3 else "report.txt"
result = self.client.generate_report(parts[1], output)
result = generate_report(parts[1], output)
print(result)
elif cmd == "safety-stop":
self.client.safety_manager.emergency_stop()
@ -279,7 +279,7 @@ class RSICommandLineInterface:
if format_type not in ["csv", "json", "pdf"]:
print("❌ Invalid format. Use 'csv', 'json', or 'pdf'.")
return
self.client.generate_report(filename, format_type)
generate_report(filename, format_type)
print(f"✅ Report generated: {filename}.{format_type}")