62 lines
2 KiB
Python
62 lines
2 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import unittest
|
||
|
|
from types import SimpleNamespace
|
||
|
|
|
||
|
|
from routes.tools import stream_tool_job
|
||
|
|
from tool_job_service import ToolJobService
|
||
|
|
|
||
|
|
|
||
|
|
class DummyRequest:
|
||
|
|
async def is_disconnected(self) -> bool:
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
class ToolStreamRouteTests(unittest.IsolatedAsyncioTestCase):
|
||
|
|
async def test_stream_tool_job_emits_current_and_terminal_updates(self) -> None:
|
||
|
|
gate = asyncio.Event()
|
||
|
|
|
||
|
|
async def send_request(method: str, params: dict[str, object]) -> dict[str, object]:
|
||
|
|
gate.set()
|
||
|
|
return {
|
||
|
|
"tool_name": "demo.tool",
|
||
|
|
"content": '{"ok": true}',
|
||
|
|
"parsed": {"ok": True},
|
||
|
|
"is_json": True,
|
||
|
|
}
|
||
|
|
|
||
|
|
service = ToolJobService(
|
||
|
|
send_request=send_request,
|
||
|
|
timeout_seconds=30.0,
|
||
|
|
retention_seconds=60.0,
|
||
|
|
)
|
||
|
|
runtime = SimpleNamespace(tool_job_service=service)
|
||
|
|
|
||
|
|
job = await service.start_job("demo.tool", {})
|
||
|
|
await gate.wait()
|
||
|
|
|
||
|
|
response = await stream_tool_job(job["job_id"], DummyRequest(), runtime=runtime)
|
||
|
|
iterator = response.body_iterator
|
||
|
|
|
||
|
|
first_chunk = await anext(iterator)
|
||
|
|
self.assertEqual(first_chunk, ": stream-open\n\n")
|
||
|
|
|
||
|
|
second_chunk = await anext(iterator)
|
||
|
|
first_payload = json.loads(second_chunk.removeprefix("data: ").strip())
|
||
|
|
self.assertEqual(first_payload["type"], "tool.job")
|
||
|
|
self.assertEqual(first_payload["job"]["job_id"], job["job_id"])
|
||
|
|
|
||
|
|
if first_payload["job"]["status"] == "completed":
|
||
|
|
terminal_payload = first_payload
|
||
|
|
else:
|
||
|
|
third_chunk = await anext(iterator)
|
||
|
|
terminal_payload = json.loads(third_chunk.removeprefix("data: ").strip())
|
||
|
|
|
||
|
|
self.assertEqual(terminal_payload["job"]["status"], "completed")
|
||
|
|
self.assertEqual(terminal_payload["job"]["result"]["tool_name"], "demo.tool")
|
||
|
|
|
||
|
|
await iterator.aclose()
|
||
|
|
await service.shutdown()
|