""" RSI TestServer - Python equivalent of KUKA's RSI TestServer.exe Simulates the robot side of an RSI connection. Listens on UDP for XML from a client, and responds with XML containing the current robot state. Usage: python test_server.py python test_server.py --config path/to/RSI_EthernetConfig.xml """ import socket import threading import tkinter as tk from tkinter import messagebox import xml.etree.ElementTree as ET import argparse import copy import sys import os import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) from RSIPI.config_parser import ConfigParser from RSIPI.xml_handler import XMLGenerator # --- Theme colours --- BG_DARK = "#1e1e2e" BG_MID = "#282840" BG_CARD = "#313150" BG_INPUT = "#3b3b5c" FG = "#cdd6f4" FG_DIM = "#7f849c" FG_BRIGHT = "#ffffff" ACCENT = "#f5a623" ACCENT_HOVER = "#f7bc5e" GREEN = "#a6e3a1" RED = "#f38ba8" BLUE = "#89b4fa" BORDER = "#45475a" AXIS_COLOURS = { "X": "#f38ba8", "Y": "#a6e3a1", "Z": "#89b4fa", "A": "#fab387", "B": "#cba6f7", "C": "#94e2d5", } class RSITestServer: """UDP server simulating a KUKA robot controller for RSI testing.""" def __init__(self, config_file: str): self.config = ConfigParser(config_file) self.network_settings = self.config.get_network_settings() self.port = self.network_settings["port"] self.robot_state = copy.deepcopy(self.config.send_variables) self.receive_template = copy.deepcopy(self.config.receive_variables) self.ipoc = 0 self.correction_step = 0.01 self.running = False self.receive_only = False self.udp_socket = None self.listen_thread = None self.client_address = None self.last_received_xml = "" self.last_sent_xml = "" self.packet_count = 0 self.lock = threading.Lock() def start(self): if self.running: return try: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.udp_socket.bind(("0.0.0.0", self.port)) self.udp_socket.settimeout(0.5) self.running = True self.packet_count = 0 self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listen_thread.start() return True except OSError as e: self.running = False return str(e) def stop(self): self.running = False if self.listen_thread: self.listen_thread.join(timeout=2) if self.udp_socket: self.udp_socket.close() self.udp_socket = None self.client_address = None def _listen_loop(self): while self.running: try: data, addr = self.udp_socket.recvfrom(4096) self.client_address = addr xml_string = data.decode("utf-8", errors="replace") with self.lock: self.last_received_xml = xml_string self._process_received(xml_string) self.packet_count += 1 if not self.receive_only: response = self._generate_response() self.last_sent_xml = response self.udp_socket.sendto(response.encode("utf-8"), addr) except socket.timeout: continue except OSError: break except Exception as e: if self.running: print(f"[TestServer] Error: {e}") def _process_received(self, xml_string: str): try: root = ET.fromstring(xml_string) ipoc_elem = root.find("IPOC") if ipoc_elem is not None and ipoc_elem.text: self.ipoc = int(ipoc_elem.text.strip()) except ET.ParseError: self.ipoc = self._extract_ipoc_string(xml_string) def _extract_ipoc_string(self, xml_string: str) -> int: end_tag = "" start_tag = "" end_idx = xml_string.find(end_tag) if end_idx == -1: return self.ipoc start_idx = xml_string.find(start_tag) if start_idx == -1: return self.ipoc try: return int(xml_string[start_idx + len(start_tag):end_idx].strip()) except ValueError: return self.ipoc def _generate_response(self) -> str: state = copy.deepcopy(self.robot_state) state["IPOC"] = self.ipoc return XMLGenerator.generate_receive_xml(state) def adjust_position(self, axis: str, delta: float): with self.lock: if "RIst" in self.robot_state and axis in self.robot_state["RIst"]: self.robot_state["RIst"][axis] += delta def reset_positions(self): with self.lock: if "RIst" in self.robot_state: for axis in self.robot_state["RIst"]: self.robot_state["RIst"][axis] = 0.0 class TestServerGUI: """Dark-themed tkinter GUI for the RSI TestServer.""" def __init__(self, config_file: str): self.server = RSITestServer(config_file) self._prev_packet_count = 0 self._prev_time = time.time() self._packets_per_sec = 0.0 self.recv_text: tk.Text = None # type: ignore[assignment] self.sent_text: tk.Text = None # type: ignore[assignment] self.root = tk.Tk() self.root.title("RSI TestServer") self.root.configure(bg=BG_DARK) self.root.geometry("1100x750") self.root.resizable(True, True) self.root.protocol("WM_DELETE_WINDOW", self._on_close) self._build_ui() self._update_display() # ── helpers ────────────────────────────────────────────────────── def _card(self, parent, **kw): f = tk.Frame(parent, bg=BG_CARD, highlightbackground=BORDER, highlightthickness=1, **kw) return f def _label(self, parent, text="", font=("Segoe UI", 10), fg=FG, **kw): return tk.Label(parent, text=text, font=font, fg=fg, bg=parent["bg"], **kw) def _btn(self, parent, text, command, width=4, accent=False, **kw): bg = ACCENT if accent else BG_INPUT fg_c = BG_DARK if accent else FG hover = ACCENT_HOVER if accent else "#4e4e7a" b = tk.Button(parent, text=text, command=command, width=width, font=("Segoe UI", 10, "bold"), fg=fg_c, bg=bg, activeforeground=fg_c, activebackground=hover, relief="flat", cursor="hand2", bd=0, highlightthickness=0, **kw) b.bind("", lambda e, b=b, h=hover: b.config(bg=h)) b.bind("", lambda e, b=b, n=bg: b.config(bg=n)) return b # ── layout ─────────────────────────────────────────────────────── def _build_ui(self): # ── Header ── header = tk.Frame(self.root, bg=BG_MID, height=56) header.grid(row=0, column=0, sticky="ew") header.grid_columnconfigure(1, weight=1) self._label(header, "RSI TestServer", font=("Segoe UI", 16, "bold"), fg=ACCENT).grid( row=0, column=0, padx=16, pady=10, sticky="w") self._label(header, f"Port {self.server.port}", font=("Segoe UI Semibold", 11), fg=FG_DIM).grid( row=0, column=1, sticky="w") self.status_dot = tk.Canvas(header, width=14, height=14, bg=BG_MID, highlightthickness=0) self.status_dot.create_oval(2, 2, 12, 12, fill=RED, outline="", tags="dot") self.status_dot.grid(row=0, column=2, padx=(0, 4)) self.status_label = self._label(header, "Offline", font=("Segoe UI Semibold", 11), fg=RED) self.status_label.grid(row=0, column=3, padx=(0, 16)) # ── Body ── body = tk.Frame(self.root, bg=BG_DARK) body.grid(row=1, column=0, sticky="nsew", padx=12, pady=8) body.grid_columnconfigure(1, weight=1) # ── Left column: controls + jog ── left = tk.Frame(body, bg=BG_DARK) left.grid(row=0, column=0, sticky="n", padx=(0, 8)) # Controls card ctrl = self._card(left) ctrl.grid(row=0, column=0, sticky="ew", pady=(0, 8)) btn_row = tk.Frame(ctrl, bg=BG_CARD) btn_row.grid(row=0, column=0, padx=10, pady=10, sticky="ew") self.start_btn = self._btn(btn_row, "Start", self._start, width=8, accent=True) self.start_btn.grid(row=0, column=0, padx=(0, 4)) self.stop_btn = self._btn(btn_row, "Stop", self._stop, width=8) self.stop_btn.config(state="disabled") self.stop_btn.grid(row=0, column=1, padx=(0, 12)) self.mode_var = tk.StringVar(value="Send + Recv") mode_btn = tk.Menubutton(btn_row, textvariable=self.mode_var, font=("Segoe UI", 9), fg=FG, bg=BG_INPUT, activeforeground=FG, activebackground="#4e4e7a", relief="flat", highlightthickness=0, indicatoron=True, cursor="hand2", width=12) mode_menu = tk.Menu(mode_btn, tearoff=0, bg=BG_INPUT, fg=FG, activebackground=ACCENT, activeforeground=BG_DARK) mode_menu.add_command(label="Send + Recv", command=lambda: self._set_mode("Send + Recv")) mode_menu.add_command(label="Recv only", command=lambda: self._set_mode("Recv only")) mode_btn["menu"] = mode_menu mode_btn.grid(row=0, column=2, padx=(0, 8)) self._label(btn_row, "Step:", font=("Segoe UI", 9), fg=FG_DIM).grid( row=0, column=3) self.step_var = tk.StringVar(value="0.01") step_e = tk.Entry(btn_row, textvariable=self.step_var, width=7, font=("Consolas", 10), fg=ACCENT, bg=BG_INPUT, insertbackground=ACCENT, relief="flat", highlightthickness=1, highlightcolor=ACCENT, highlightbackground=BORDER) step_e.grid(row=0, column=4, padx=4) step_e.bind("", self._step_changed) step_e.bind("", self._step_changed) # Jog card jog = self._card(left) jog.grid(row=1, column=0, sticky="ew") jog_title = tk.Frame(jog, bg=BG_CARD) jog_title.grid(row=0, column=0, columnspan=4, sticky="ew", padx=10, pady=(8, 4)) self._label(jog_title, "Jog Control", font=("Segoe UI Semibold", 11), fg=FG_BRIGHT).pack( side="left") axes = ["X", "Y", "Z", "A", "B", "C"] self.pos_labels = {} for i, axis in enumerate(axes): row_frame = tk.Frame(jog, bg=BG_CARD) row_frame.grid(row=i + 1, column=0, sticky="ew", padx=10, pady=2) row_frame.grid_columnconfigure(1, weight=1) colour = AXIS_COLOURS[axis] # Colour pip pip = tk.Canvas(row_frame, width=8, height=8, bg=BG_CARD, highlightthickness=0) pip.create_oval(1, 1, 7, 7, fill=colour, outline="") pip.grid(row=0, column=0, padx=(0, 6)) self._label(row_frame, f"{axis}", font=("Segoe UI Semibold", 11), fg=colour).grid( row=0, column=1, sticky="w") self.pos_labels[axis] = self._label( row_frame, "0.0000", font=("Consolas", 12), fg=FG_BRIGHT) self.pos_labels[axis].grid(row=0, column=2, padx=8) self._btn(row_frame, "+", lambda a=axis: self._adjust(a, 1), width=3).grid( row=0, column=3, padx=1) self._btn(row_frame, "-", lambda a=axis: self._adjust(a, -1), width=3).grid( row=0, column=4, padx=1) reset_row = tk.Frame(jog, bg=BG_CARD) reset_row.grid(row=len(axes) + 1, column=0, sticky="ew", padx=10, pady=(6, 10)) self._btn(reset_row, "Reset All", self._reset, width=36, accent=False).pack(fill="x") # ── Right column: state + xml ── right = tk.Frame(body, bg=BG_DARK) right.grid(row=0, column=1, sticky="nsew") right.grid_rowconfigure(1, weight=1) # Info bar info = self._card(right) info.grid(row=0, column=0, sticky="ew", pady=(0, 8)) info_inner = tk.Frame(info, bg=BG_CARD) info_inner.grid(padx=10, pady=8, sticky="ew") info.grid_columnconfigure(0, weight=1) info_inner.grid_columnconfigure(1, weight=1) info_inner.grid_columnconfigure(3, weight=1) self._label(info_inner, "IPOC", font=("Segoe UI", 9), fg=FG_DIM).grid( row=0, column=0, sticky="w") self.ipoc_val = self._label(info_inner, "0", font=("Consolas", 12, "bold"), fg=BLUE) self.ipoc_val.grid(row=1, column=0, sticky="w") self._label(info_inner, "Client", font=("Segoe UI", 9), fg=FG_DIM).grid( row=0, column=1, sticky="w", padx=(20, 0)) self.client_val = self._label(info_inner, "--", font=("Consolas", 11), fg=FG) self.client_val.grid(row=1, column=1, sticky="w", padx=(20, 0)) self._label(info_inner, "Packets/s", font=("Segoe UI", 9), fg=FG_DIM).grid( row=0, column=2, sticky="w", padx=(20, 0)) self.pps_val = self._label(info_inner, "0", font=("Consolas", 12, "bold"), fg=GREEN) self.pps_val.grid(row=1, column=2, sticky="w", padx=(20, 0)) # Robot state card — grid layout state_card = self._card(right) state_card.grid(row=1, column=0, sticky="nsew", pady=(0, 8)) state_hdr = tk.Frame(state_card, bg=BG_CARD) state_hdr.grid(row=0, column=0, sticky="ew", padx=10, pady=(8, 4)) self._label(state_hdr, "Robot State", font=("Segoe UI Semibold", 11), fg=FG_BRIGHT).pack( side="left") # Scrollable grid area state_outer = tk.Frame(state_card, bg=BG_MID) state_outer.grid(row=1, column=0, sticky="nsew", padx=8, pady=(0, 8)) state_card.grid_rowconfigure(1, weight=1) state_canvas = tk.Canvas(state_outer, bg=BG_MID, highlightthickness=0, height=280) state_canvas.pack(side="left", fill="both", expand=True) state_scroll = tk.Scrollbar(state_outer, orient="vertical", command=state_canvas.yview, bg=BG_MID, troughcolor=BG_MID) state_scroll.pack(side="right", fill="y") state_canvas.configure(yscrollcommand=state_scroll.set) self._state_grid = tk.Frame(state_canvas, bg=BG_MID) state_canvas.create_window((0, 0), window=self._state_grid, anchor="nw") self._state_canvas = state_canvas # Build grid from robot_state self._state_cells = {} # (group, subkey) -> Label self._build_state_grid() # XML card xml_card = self._card(right) xml_card.grid(row=2, column=0, sticky="nsew") xml_hdr = tk.Frame(xml_card, bg=BG_CARD) xml_hdr.grid(row=0, column=0, columnspan=2, sticky="ew", padx=10, pady=(8, 4)) self._label(xml_hdr, "XML Traffic", font=("Segoe UI Semibold", 11), fg=FG_BRIGHT).pack( side="left") xml_card.grid_columnconfigure(1, weight=1) for i, (lbl, attr) in enumerate([("RX", "recv_text"), ("TX", "sent_text")]): tag_bg = GREEN if lbl == "RX" else BLUE tag = tk.Label(xml_card, text=f" {lbl} ", font=("Consolas", 9, "bold"), fg=BG_DARK, bg=tag_bg) tag.grid(row=i + 1, column=0, sticky="nw", padx=(10, 4), pady=2) t = tk.Text(xml_card, width=60, height=6, font=("Consolas", 9), fg=FG_DIM, bg=BG_MID, relief="flat", bd=0, padx=6, pady=4, wrap="word", state="disabled") t.grid(row=i + 1, column=1, sticky="nsew", padx=(0, 8), pady=2) setattr(self, attr, t) # bottom pad tk.Frame(xml_card, bg=BG_CARD, height=8).grid( row=3, column=0, columnspan=2) # ── state grid builder ──────────────────────────────────────────── def _build_state_grid(self): """Build the header row and value cells from robot_state structure.""" grid = self._state_grid row = 0 for key, value in self.server.robot_state.items(): if isinstance(value, dict): # Group header self._label(grid, key, font=("Segoe UI Semibold", 9), fg=ACCENT).grid(row=row, column=0, sticky="w", padx=(8, 4), pady=(6, 0)) row += 1 # Sub-key headers + value cells in columns subkeys = list(value.keys()) # Chunk into rows of 6 for wide dicts like Tech chunk_size = 6 for chunk_start in range(0, len(subkeys), chunk_size): chunk = subkeys[chunk_start:chunk_start + chunk_size] for col, sk in enumerate(chunk): # Header self._label(grid, sk, font=("Consolas", 8), fg=FG_DIM).grid( row=row, column=col + 1, padx=4, sticky="e") row += 1 for col, sk in enumerate(chunk): # Value cell cell = self._label(grid, "0.0000", font=("Consolas", 10), fg=FG_BRIGHT) cell.grid(row=row, column=col + 1, padx=4, sticky="e") self._state_cells[(key, sk)] = cell row += 1 else: # Scalar value self._label(grid, key, font=("Segoe UI Semibold", 9), fg=ACCENT).grid(row=row, column=0, sticky="w", padx=(8, 4), pady=(6, 0)) cell = self._label(grid, str(value), font=("Consolas", 10), fg=FG_BRIGHT) cell.grid(row=row, column=1, padx=4, sticky="e") self._state_cells[(key, None)] = cell row += 1 # Update scroll region after grid is built grid.update_idletasks() self._state_canvas.configure(scrollregion=self._state_canvas.bbox("all")) def _update_state_grid(self): """Update all cell values from current robot_state.""" for key, value in self.server.robot_state.items(): if isinstance(value, dict): for sk, sv in value.items(): cell = self._state_cells.get((key, sk)) if cell: try: cell.config(text=f"{float(sv):>8.4f}") except (ValueError, TypeError): cell.config(text=str(sv)) else: cell = self._state_cells.get((key, None)) if cell: try: cell.config(text=f"{float(value):>8.4f}") except (ValueError, TypeError): cell.config(text=str(value)) # ── actions ────────────────────────────────────────────────────── def _start(self): result = self.server.start() if result is True: self.status_label.config(text="Online", fg=GREEN) self.status_dot.itemconfig("dot", fill=GREEN) self.start_btn.config(state="disabled", bg=BG_INPUT, cursor="arrow") self.stop_btn.config(state="normal", cursor="hand2") self._prev_packet_count = 0 self._prev_time = time.time() else: messagebox.showerror("Error", f"Port {self.server.port} is already in use.\n{result}") def _stop(self): self.server.stop() self.status_label.config(text="Offline", fg=RED) self.status_dot.itemconfig("dot", fill=RED) self.client_val.config(text="--") self.pps_val.config(text="0") self.start_btn.config(state="normal", bg=ACCENT, cursor="hand2") self.stop_btn.config(state="disabled", cursor="arrow") def _set_mode(self, mode): self.mode_var.set(mode) self.server.receive_only = (mode == "Recv only") def _step_changed(self, event=None): try: self.server.correction_step = float(self.step_var.get()) except ValueError: self.step_var.set(str(self.server.correction_step)) def _adjust(self, axis: str, direction: int): self.server.adjust_position(axis, direction * self.server.correction_step) def _reset(self): self.server.reset_positions() # ── display loop ───────────────────────────────────────────────── def _update_display(self): with self.server.lock: # Position values rist = self.server.robot_state.get("RIst", {}) for axis, label in self.pos_labels.items(): val = rist.get(axis, 0.0) label.config(text=f"{val:+.4f}") # IPOC self.ipoc_val.config(text=str(self.server.ipoc)) # Client if self.server.client_address: self.client_val.config( text=f"{self.server.client_address[0]}:{self.server.client_address[1]}") # Packets/sec now = time.time() dt = now - self._prev_time if dt >= 1.0: delta = self.server.packet_count - self._prev_packet_count self._packets_per_sec = delta / dt self._prev_packet_count = self.server.packet_count self._prev_time = now self.pps_val.config(text=f"{self._packets_per_sec:.0f}") # Robot state grid self._update_state_grid() # XML traffic (pretty-printed) self._set_xml(self.recv_text, self.server.last_received_xml) self._set_xml(self.sent_text, self.server.last_sent_xml) self.root.after(100, self._update_display) @staticmethod def _set_text(widget, text): widget.config(state="normal") widget.delete("1.0", "end") widget.insert("1.0", text) widget.config(state="disabled") @staticmethod def _set_xml(widget, xml_string): """Pretty-print XML into a text widget.""" formatted = xml_string if xml_string.strip(): try: root = ET.fromstring(xml_string) ET.indent(root) formatted = ET.tostring(root, encoding="unicode") except ET.ParseError: pass widget.config(state="normal") widget.delete("1.0", "end") widget.insert("1.0", formatted) widget.config(state="disabled") def _on_close(self): self.server.stop() self.root.destroy() def run(self): self.root.mainloop() def main(): parser = argparse.ArgumentParser(description="RSI TestServer - KUKA Robot Simulator") parser.add_argument( "--config", type=str, default="RSI_EthernetConfig.xml", help="Path to RSI_EthernetConfig.xml" ) args = parser.parse_args() if not os.path.exists(args.config): print(f"Error: Config file not found: {args.config}") sys.exit(1) app = TestServerGUI(args.config) app.run() if __name__ == "__main__": main()