185 lines
6.7 KiB
Python
185 lines
6.7 KiB
Python
"""Tests for IOAPI."""
|
|
import pytest
|
|
import sys
|
|
import os
|
|
from unittest.mock import MagicMock
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
|
|
|
from RSIPI.io_api import IOAPI
|
|
from RSIPI.exceptions import RSIVariableError
|
|
|
|
|
|
def _make_mock_client(send_vars=None, receive_vars=None):
|
|
"""
|
|
Build a mock RSIClient with plain dict send/receive variables
|
|
and a safety_manager that passes everything through.
|
|
"""
|
|
client = MagicMock()
|
|
client.send_variables = send_vars if send_vars is not None else {}
|
|
client.receive_variables = receive_vars if receive_vars is not None else {}
|
|
|
|
# Safety manager: validate returns the value unchanged
|
|
client.safety_manager.validate.side_effect = lambda path, val: val
|
|
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def io_with_digout():
|
|
"""IOAPI with Digout in receive_variables and Digin in send_variables."""
|
|
send = {
|
|
"Digin": {"i1": 1, "i2": 0, "i3": 1},
|
|
"IPOC": 12345,
|
|
}
|
|
recv = {
|
|
"Digout": {"o1": 0, "o2": 0, "o3": 0},
|
|
"DiO": 0,
|
|
}
|
|
client = _make_mock_client(send_vars=send, receive_vars=recv)
|
|
return IOAPI(client)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# toggle
|
|
# ---------------------------------------------------------------------------
|
|
class TestToggle:
|
|
"""Tests for IOAPI.toggle()."""
|
|
|
|
def test_toggle_sets_value_via_update_variable(self, io_with_digout):
|
|
"""toggle should call _tools.update_variable with group.name and int state."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "Updated Digout.o1 to 1"
|
|
|
|
result = io.toggle("Digout", "o1", True)
|
|
|
|
io._tools.update_variable.assert_called_once_with("Digout.o1", 1)
|
|
assert result == "Updated Digout.o1 to 1"
|
|
|
|
def test_toggle_false_sends_zero(self, io_with_digout):
|
|
"""toggle with False should send 0."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "Updated Digout.o2 to 0"
|
|
|
|
io.toggle("Digout", "o2", False)
|
|
io._tools.update_variable.assert_called_once_with("Digout.o2", 0)
|
|
|
|
def test_toggle_int_one(self, io_with_digout):
|
|
"""toggle with int 1 should send 1."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "Updated Digout.o3 to 1"
|
|
|
|
io.toggle("Digout", "o3", 1)
|
|
io._tools.update_variable.assert_called_once_with("Digout.o3", 1)
|
|
|
|
def test_toggle_truthy_value_normalizes_to_one(self, io_with_digout):
|
|
"""Any truthy value should normalize to 1."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "ok"
|
|
|
|
io.toggle("Digout", "o1", 42)
|
|
io._tools.update_variable.assert_called_once_with("Digout.o1", 1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# set_output
|
|
# ---------------------------------------------------------------------------
|
|
class TestSetOutput:
|
|
"""Tests for IOAPI.set_output()."""
|
|
|
|
def test_formats_channel_name(self, io_with_digout):
|
|
"""set_output(3, True) should call toggle with 'o3'."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "Updated Digout.o3 to 1"
|
|
|
|
io.set_output(3, True)
|
|
io._tools.update_variable.assert_called_once_with("Digout.o3", 1)
|
|
|
|
def test_custom_group(self, io_with_digout):
|
|
"""set_output with custom group should use that group."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "Updated DiO.o5 to 1"
|
|
|
|
io.set_output(5, True, group="DiO")
|
|
io._tools.update_variable.assert_called_once_with("DiO.o5", 1)
|
|
|
|
def test_set_output_off(self, io_with_digout):
|
|
"""set_output with False should send 0."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "Updated Digout.o1 to 0"
|
|
|
|
io.set_output(1, False)
|
|
io._tools.update_variable.assert_called_once_with("Digout.o1", 0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_input
|
|
# ---------------------------------------------------------------------------
|
|
class TestGetInput:
|
|
"""Tests for IOAPI.get_input()."""
|
|
|
|
def test_reads_true_from_send_variables(self, io_with_digout):
|
|
"""get_input should read from send_variables (what robot sends us)."""
|
|
result = io_with_digout.get_input(1, group="Digin")
|
|
assert result is True # i1 = 1
|
|
|
|
def test_reads_false_from_send_variables(self, io_with_digout):
|
|
"""get_input for channel with value 0 should return False."""
|
|
result = io_with_digout.get_input(2, group="Digin")
|
|
assert result is False # i2 = 0
|
|
|
|
def test_missing_group_raises(self, io_with_digout):
|
|
"""get_input with non-existent group should raise RSIVariableError."""
|
|
with pytest.raises(RSIVariableError, match="not found in send_variables"):
|
|
io_with_digout.get_input(1, group="NonExistent")
|
|
|
|
def test_missing_channel_raises(self, io_with_digout):
|
|
"""get_input with non-existent channel should raise RSIVariableError."""
|
|
with pytest.raises(RSIVariableError, match="not found in group"):
|
|
io_with_digout.get_input(99, group="Digin")
|
|
|
|
def test_default_group_is_digin(self):
|
|
"""Default group should be 'Digin'."""
|
|
send = {"Digin": {"i1": 1}}
|
|
client = _make_mock_client(send_vars=send)
|
|
io = IOAPI(client)
|
|
assert io.get_input(1) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# pulse (basic structure test, not timing)
|
|
# ---------------------------------------------------------------------------
|
|
class TestPulse:
|
|
"""Tests for IOAPI.pulse()."""
|
|
|
|
def test_pulse_calls_on_then_off(self, io_with_digout):
|
|
"""pulse should turn output on then off."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "ok"
|
|
|
|
result = io.pulse(1, duration=0.0) # zero duration for fast test
|
|
|
|
calls = io._tools.update_variable.call_args_list
|
|
# First call: ON (value=1), second call: OFF (value=0)
|
|
assert calls[0].args == ("Digout.o1", 1)
|
|
assert calls[1].args == ("Digout.o1", 0)
|
|
assert "Pulse completed" in result
|
|
|
|
def test_pulse_returns_message_with_duration(self, io_with_digout):
|
|
"""Return string should include the duration."""
|
|
io = io_with_digout
|
|
io._tools = MagicMock()
|
|
io._tools.update_variable.return_value = "ok"
|
|
|
|
result = io.pulse(2, duration=0.0)
|
|
assert "0.0s" in result
|
|
assert "Digout.o2" in result
|