manager.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from __future__ import annotations
  2. from datetime import datetime
  3. from threading import Lock
  4. from typing import Dict, List
  5. from backend.models import TaskCreateRequest, TaskRecord, TaskStatus
  6. class TaskManager:
  7. def __init__(self) -> None:
  8. self._tasks: Dict[str, TaskRecord] = {}
  9. self._lock = Lock()
  10. def create(self, request: TaskCreateRequest) -> TaskRecord:
  11. task = TaskRecord(
  12. title=request.title,
  13. input=request.input,
  14. agent_id=request.agent_id,
  15. metadata=request.metadata,
  16. )
  17. with self._lock:
  18. self._tasks[task.task_id] = task
  19. return task
  20. def get(self, task_id: str) -> TaskRecord:
  21. with self._lock:
  22. return self._tasks[task_id]
  23. def list(self) -> List[TaskRecord]:
  24. with self._lock:
  25. return list(self._tasks.values())
  26. def update_status(self, task_id: str, status: TaskStatus, *, error: str | None = None) -> TaskRecord:
  27. with self._lock:
  28. task = self._tasks[task_id]
  29. task.status = status
  30. task.error = error
  31. task.updated_at = datetime.now()
  32. return task
  33. def complete(self, task_id: str, *, output: str, artifacts: dict) -> TaskRecord:
  34. with self._lock:
  35. task = self._tasks[task_id]
  36. task.output = output
  37. task.artifacts = artifacts
  38. task.status = TaskStatus.completed
  39. task.updated_at = datetime.now()
  40. return task
  41. def fail(self, task_id: str, error: str) -> TaskRecord:
  42. with self._lock:
  43. task = self._tasks[task_id]
  44. task.status = TaskStatus.failed
  45. task.error = error
  46. task.updated_at = datetime.now()
  47. return task
  48. task_manager = TaskManager()