Add test for chunked IF response reading
This commit is contained in:
parent
da176a1363
commit
de58209fd0
1 changed files with 33 additions and 0 deletions
|
|
@ -782,3 +782,36 @@ async def test_do_restore_handles_stdout_read_failure(tmp_path):
|
|||
|
||||
# Should return empty string on failure (player starts fresh)
|
||||
assert result == ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_response_handles_chunked_data():
|
||||
"""_read_response() correctly handles data arriving in chunks."""
|
||||
player = MagicMock()
|
||||
session = IFSession(player, "/path/to/story.z5")
|
||||
|
||||
mock_process = AsyncMock()
|
||||
mock_process.stdout = AsyncMock()
|
||||
session.process = mock_process
|
||||
|
||||
# Data arrives in multi-byte chunks, last chunk includes prompt
|
||||
chunks = [
|
||||
b"Welcome to Zork.\n",
|
||||
b"West of House\nYou are standing ",
|
||||
b"in an open field.\n>",
|
||||
]
|
||||
chunk_iter = iter(chunks)
|
||||
|
||||
async def read_chunked(n):
|
||||
try:
|
||||
return next(chunk_iter)
|
||||
except StopIteration:
|
||||
return b""
|
||||
|
||||
mock_process.stdout.read = AsyncMock(side_effect=read_chunked)
|
||||
|
||||
result = await session._read_response()
|
||||
|
||||
assert "Welcome to Zork" in result
|
||||
assert "open field" in result
|
||||
assert not result.strip().endswith(">")
|
||||
|
|
|
|||
Loading…
Reference in a new issue