Jelajahi Sumber

feat: 优化取消研究任务的功能,优化前端与后端交互

JJSun 3 bulan lalu
induk
melakukan
393b633d4a
19 mengubah file dengan 225 tambahan dan 261 penghapusan
  1. 5 0
      Co-creation-projects/JJason-DeepCastAgent/.vscode/settings.json
  2. 9 0
      Co-creation-projects/JJason-DeepCastAgent/backend/pyproject.toml
  3. 0 19
      Co-creation-projects/JJason-DeepCastAgent/backend/scripts/check_paths.py
  4. 11 6
      Co-creation-projects/JJason-DeepCastAgent/backend/scripts/test_agent_workflow.py
  5. 4 5
      Co-creation-projects/JJason-DeepCastAgent/backend/scripts/test_audio_generator.py
  6. 0 62
      Co-creation-projects/JJason-DeepCastAgent/backend/scripts/test_script_generator.py
  7. 4 2
      Co-creation-projects/JJason-DeepCastAgent/backend/scripts/verify_ecnu_llm.py
  8. 2 1
      Co-creation-projects/JJason-DeepCastAgent/backend/scripts/verify_ecnu_tts.py
  9. 13 13
      Co-creation-projects/JJason-DeepCastAgent/backend/src/__init__.py
  10. 15 40
      Co-creation-projects/JJason-DeepCastAgent/backend/src/agent.py
  11. 11 29
      Co-creation-projects/JJason-DeepCastAgent/backend/src/config.py
  12. 81 54
      Co-creation-projects/JJason-DeepCastAgent/backend/src/main.py
  13. 0 10
      Co-creation-projects/JJason-DeepCastAgent/backend/src/models.py
  14. 25 10
      Co-creation-projects/JJason-DeepCastAgent/backend/src/services/audio_generator.py
  15. 7 1
      Co-creation-projects/JJason-DeepCastAgent/backend/src/services/audio_synthesizer.py
  16. 16 4
      Co-creation-projects/JJason-DeepCastAgent/backend/src/services/script_generator.py
  17. 3 3
      Co-creation-projects/JJason-DeepCastAgent/backend/src/services/tool_events.py
  18. 8 2
      Co-creation-projects/JJason-DeepCastAgent/frontend/src/App.vue
  19. 11 0
      Co-creation-projects/JJason-DeepCastAgent/frontend/src/services/api.ts

+ 5 - 0
Co-creation-projects/JJason-DeepCastAgent/.vscode/settings.json

@@ -0,0 +1,5 @@
+{
+    "python.analysis.extraPaths": [
+        "./backend/src"
+    ]
+}

+ 9 - 0
Co-creation-projects/JJason-DeepCastAgent/backend/pyproject.toml

@@ -46,6 +46,14 @@ lint.select = [
     "T20",  # flake8-print
 ]
 lint.ignore = [
+    "D100",  # Missing docstring in public module
+    "D101",  # Missing docstring in public class
+    "D102",  # Missing docstring in public method
+    "D103",  # Missing docstring in public function
+    "D104",  # Missing docstring in public package
+    "D105",  # Missing docstring in magic method
+    "D106",  # Missing docstring in public nested class
+    "D107",  # Missing docstring in __init__
     "D400",  # 中文 docstring 句号检测误报
     "D415",  # 中文 docstring 句号检测误报
     "D212",  # 多行 docstring 首行格式
@@ -55,6 +63,7 @@ lint.ignore = [
 [tool.ruff.lint.per-file-ignores]
 "tests/*" = ["D", "UP"]
 "scripts/*" = ["T201"]
+"backend/scripts/test_agent_workflow.py" = ["E402"]
 
 [tool.ruff.lint.pydocstyle]
 convention = "google"

+ 0 - 19
Co-creation-projects/JJason-DeepCastAgent/backend/scripts/check_paths.py

@@ -1,19 +0,0 @@
-import sys
-import os
-from pathlib import Path
-
-# 添加 src 到 sys.path
-sys.path.append(str(Path(__file__).parent.parent / "src"))
-
-from config import Configuration, BACKEND_ROOT
-
-def main():
-    print(f"BACKEND_ROOT: {BACKEND_ROOT}")
-    
-    config = Configuration.from_env()
-    
-    print(f"Audio Output Dir: {config.audio_output_dir}")
-    print(f"Notes Workspace: {config.notes_workspace}")
-
-if __name__ == "__main__":
-    main()

+ 11 - 6
Co-creation-projects/JJason-DeepCastAgent/backend/scripts/test_agent_workflow.py

@@ -1,4 +1,3 @@
-import os
 import sys
 from pathlib import Path
 
@@ -17,10 +16,12 @@ if env_path.exists():
 else:
     print(f"Warning: .env file not found at {env_path}")
 
-from config import Configuration, SearchAPI
+import logging
+import shutil
+
 from agent import DeepResearchAgent
+from config import Configuration, SearchAPI
 
-import logging
 
 def configure_logging():
     # Set global level to INFO first
@@ -74,7 +75,6 @@ def main():
     output_dir = Path(config.audio_output_dir)
     if output_dir.exists():
         print(f"Cleaning up output directory: {output_dir}")
-        import shutil
         shutil.rmtree(output_dir)
     output_dir.mkdir(parents=True, exist_ok=True)
     
@@ -88,10 +88,15 @@ def main():
         print("WORKFLOW COMPLETED SUCCESSFULLY")
         print("="*50)
         
-        print(f"Report Summary Length: {len(result.running_summary)}")
+        # running_summary may be None; guard against calling len(None)
+        summary_len = len(result.running_summary) if result.running_summary else 0
+        print(f"Report Summary Length: {summary_len}")
         
         print("\n" + "="*20 + " REPORT CONTENT " + "="*20)
-        print(result.running_summary)
+        if result.running_summary:
+            print(result.running_summary)
+        else:
+            print("(No report generated)")
         print("="*56 + "\n")
         
         if result.podcast_script:

+ 4 - 5
Co-creation-projects/JJason-DeepCastAgent/backend/scripts/test_audio_generator.py

@@ -1,14 +1,13 @@
-import unittest
-from unittest.mock import MagicMock, patch, mock_open
-import sys
 import os
-from pathlib import Path
+import sys
+import unittest
+from unittest.mock import MagicMock, mock_open, patch
 
 # Add src to path
 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../src')))
 
 from services.audio_generator import AudioGenerationService
-from config import Configuration
+
 
 class TestAudioGenerationService(unittest.TestCase):
     def setUp(self):

+ 0 - 62
Co-creation-projects/JJason-DeepCastAgent/backend/scripts/test_script_generator.py

@@ -1,62 +0,0 @@
-import unittest
-from unittest.mock import MagicMock
-import sys
-import os
-
-# Add src to path
-sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../src')))
-
-from services.script_generator import ScriptGenerationService
-from models import SummaryState
-from config import Configuration
-
-class TestScriptGenerationService(unittest.TestCase):
-    def setUp(self):
-        self.mock_agent = MagicMock()
-        self.mock_config = MagicMock()
-        self.mock_config.strip_thinking_tokens = True
-        self.service = ScriptGenerationService(self.mock_agent, self.mock_config)
-
-    def test_generate_script_success(self):
-        state = SummaryState(research_topic="Test Topic")
-        state.structured_report = "# Test Report\nContent..."
-        
-        # Mock LLM response
-        mock_response = """
-        Thinking process...
-        
-        ```json
-        [
-            {"role": "Host", "content": "Hello"},
-            {"role": "Guest", "content": "Hi there"}
-        ]
-        ```
-        """
-        self.mock_agent.run.return_value = mock_response
-
-        script = self.service.generate_script(state)
-        
-        self.assertEqual(len(script), 2)
-        self.assertEqual(script[0]['role'], "Host")
-        self.assertEqual(script[0]['content'], "Hello")
-        self.assertEqual(script[1]['role'], "Guest")
-        self.assertEqual(script[1]['content'], "Hi there")
-
-    def test_generate_script_no_report(self):
-        state = SummaryState(research_topic="Test Topic")
-        state.structured_report = None
-        
-        script = self.service.generate_script(state)
-        self.assertEqual(script, [])
-
-    def test_generate_script_invalid_json(self):
-        state = SummaryState(research_topic="Test Topic")
-        state.structured_report = "Report"
-        
-        self.mock_agent.run.return_value = "Not JSON"
-        
-        script = self.service.generate_script(state)
-        self.assertEqual(script, [])
-
-if __name__ == '__main__':
-    unittest.main()

+ 4 - 2
Co-creation-projects/JJason-DeepCastAgent/backend/scripts/verify_ecnu_llm.py

@@ -1,5 +1,6 @@
 import os
 import sys
+
 import requests
 from dotenv import load_dotenv
 
@@ -10,6 +11,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../src'
 load_dotenv(os.path.join(os.path.dirname(__file__), '../.env'))
 
 def test_llm_api():
+    """测试 ECNU LLM API 是否可用。"""
     api_key = os.getenv("LLM_API_KEY")
     base_url = os.getenv("LLM_BASE_URL", "https://chat.ecnu.edu.cn/open/api/v1")
     model = os.getenv("LLM_MODEL_ID", "ecnu-max")
@@ -20,7 +22,7 @@ def test_llm_api():
     else:
         url = base_url
 
-    print(f"Testing LLM API...")
+    print("Testing LLM API...")
     print(f"URL: {url}")
     print(f"Model: {model}")
     print(f"API Key: {api_key[:8]}..." if api_key else "API Key: None")
@@ -48,7 +50,7 @@ def test_llm_api():
         if response.status_code == 200:
             result = response.json()
             content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
-            print(f"✅ Success!")
+            print("✅ Success!")
             print(f"Response: {content}")
         else:
             print(f"❌ Failed with status {response.status_code}")

+ 2 - 1
Co-creation-projects/JJason-DeepCastAgent/backend/scripts/verify_ecnu_tts.py

@@ -1,5 +1,6 @@
 import os
 import sys
+
 import requests
 from dotenv import load_dotenv
 
@@ -14,7 +15,7 @@ def test_tts_api():
     base_url = os.getenv("TTS_BASE_URL", "https://chat.ecnu.edu.cn/open/api/v1/audio/speech")
     model = os.getenv("TTS_MODEL", "ecnu-tts")
     
-    print(f"Testing TTS API...")
+    print("Testing TTS API...")
     print(f"URL: {base_url}")
     print(f"Model: {model}")
     print(f"API Key: {api_key[:8]}..." if api_key else "API Key: None")

+ 13 - 13
Co-creation-projects/JJason-DeepCastAgent/backend/src/__init__.py

@@ -1,18 +1,18 @@
 """DeepCast - 由 HelloAgents 驱动的自动播客生成代理。"""
 
-__version__ = "0.0.1"
+    __version__ = "0.0.1"
 
-from .agent import DeepResearchAgent
-from .config import Configuration, SearchAPI
-from .models import SummaryState, SummaryStateInput, SummaryStateOutput, TodoItem
+    from .agent import DeepResearchAgent
+    from .config import Configuration, SearchAPI
+    from .models import SummaryState, SummaryStateInput, SummaryStateOutput, TodoItem
 
-__all__ = [
-    "DeepResearchAgent",
-    "Configuration",
-    "SearchAPI",
-    "SummaryState",
-    "SummaryStateInput",
-    "SummaryStateOutput",
-    "TodoItem",
-]
+    __all__ = [
+        "DeepResearchAgent",
+        "Configuration",
+        "SearchAPI",
+        "SummaryState",
+        "SummaryStateInput",
+        "SummaryStateOutput",
+        "TodoItem",
+    ]
 

+ 15 - 40
Co-creation-projects/JJason-DeepCastAgent/backend/src/agent.py

@@ -3,7 +3,6 @@
 from __future__ import annotations
 
 import logging
-import re
 from collections.abc import Callable, Iterator
 from pathlib import Path
 from queue import Empty, Queue
@@ -18,7 +17,6 @@ from config import Configuration
 from models import SummaryState, SummaryStateOutput, TodoItem
 from prompts import (
     report_writer_instructions,
-    script_writer_instructions,
     task_summarizer_instructions,
     todo_planner_system_prompt,
 )
@@ -77,11 +75,6 @@ class DeepResearchAgent:
             system_prompt=report_writer_instructions.strip(),
             llm=self.smart_llm,
         )
-        self.script_agent = self._create_tool_aware_agent(
-            name="脚本策划专家",
-            system_prompt=script_writer_instructions.strip(),
-            llm=self.default_llm,
-        )
 
         self._summarizer_factory: Callable[[], ToolAwareSimpleAgent] = lambda: self._create_tool_aware_agent(  # noqa: E501
             name="任务总结专家",
@@ -92,22 +85,16 @@ class DeepResearchAgent:
         self.planner = PlanningService(self.todo_agent, self.config)
         self.summarizer = SummarizationService(self._summarizer_factory, self.config)
         self.reporting = ReportingService(self.report_agent, self.config)
-        self.script_generator = ScriptGenerationService(self.script_agent, self.config)
+        self.script_generator = ScriptGenerationService(self.config)
         self.audio_generator = AudioGenerationService(self.config)
 
         self.podcast_synthesizer = PodcastSynthesisService(self.config)
-        self._last_search_notices: list[str] = []
 
     def cancel(self) -> None:
         """请求取消当前正在执行的研究任务。"""
         logger.info("Cancel requested for research agent")
         self._cancel_event.set()
 
-    def _check_cancelled(self) -> None:
-        """检查是否收到取消请求,如果是则抛出 CancelledException。"""
-        if self._cancel_event.is_set():
-            raise CancelledException("研究任务已被用户取消")
-
     def is_cancelled(self) -> bool:
         """检查当前任务是否已被取消。"""
         return self._cancel_event.is_set()
@@ -456,14 +443,17 @@ class DeepResearchAgent:
                 "total": total,
                 "role": role,
                 "preview": preview,
-                "message": f"[TTS {current}/{total}] 正在为 {role} 生成语音: {preview}",
+                "message": f"[TTS {current}/{total}] ✓ {role} 语音生成成功",
             })
             return True  # 返回 True 表示继续
         
         def run_audio_generation():
             """在单独线程中运行音频生成"""
             try:
-                files = self.audio_generator.generate_audio(script, task_id, audio_progress_callback)
+                files = self.audio_generator.generate_audio(
+                    script, task_id, audio_progress_callback,
+                    cancel_event=self._cancel_event,
+                )
                 audio_result.append(files)
             except Exception as e:
                 if not self.is_cancelled():
@@ -501,7 +491,7 @@ class DeepResearchAgent:
                 if event.get("type") == "audio_progress":
                     yield {
                         "type": "log", 
-                        "message": f"[TTS {event['current']}/{event['total']}] ✓ {event['role']} 语音生成成功"
+                        "message": f"[TTS {event['current']}/{event['total']}] ✓ {event['role']} 语音已完成"
                     }
             except Empty:
                 continue
@@ -532,8 +522,14 @@ class DeepResearchAgent:
             "stage": "synthesis",
             "message": "正在合成完整播客音频文件...",
         }
+
+        # 检查取消
+        if self.is_cancelled():
+            yield {"type": "cancelled", "message": "研究任务已取消"}
+            return
+
         yield {"type": "log", "message": "使用 FFmpeg 拼接所有语音片段..."}
-        podcast_file = self.podcast_synthesizer.synthesize_podcast(audio_files, task_id)
+        podcast_file = self.podcast_synthesizer.synthesize_podcast(audio_files, task_id, cancel_check=self.is_cancelled)
         if podcast_file:
             yield {
                 "type": "podcast_ready",
@@ -575,7 +571,6 @@ class DeepResearchAgent:
             self.config,
             state.research_loop_count,
         )
-        self._last_search_notices = notices
         task.notices = notices
 
         if emit_stream:
@@ -699,11 +694,6 @@ class DeepResearchAgent:
             return []
         return events
 
-    @property
-    def _tool_call_events(self) -> list[dict[str, Any]]:
-        """为旧版集成暴露记录的工具事件。"""
-        return self._tool_tracker.as_dicts()
-
     def _serialize_task(self, task: TodoItem) -> dict[str, Any]:
         """将任务数据类转换为前端可序列化的字典。"""
         return {
@@ -754,7 +744,7 @@ class DeepResearchAgent:
                     "content": content,
                 }
             )
-            note_id = self._extract_note_id_from_text(response)
+            note_id = self._tool_tracker._extract_note_id(response)
 
         if not note_id:
             return None
@@ -820,19 +810,4 @@ class DeepResearchAgent:
 
         return None
 
-    @staticmethod
-    def _extract_note_id_from_text(response: str) -> str | None:
-        if not response:
-            return None
-
-        match = re.search(r"ID:\s*([^\n]+)", response)
-        if not match:
-            return None
-
-        return match.group(1).strip()
-
 
-def run_deep_research(topic: str, config: Configuration | None = None) -> SummaryStateOutput:
-    """镜像基于类的 API 的便捷函数。"""
-    agent = DeepResearchAgent(config=config)
-    return agent.run(topic)

+ 11 - 29
Co-creation-projects/JJason-DeepCastAgent/backend/src/config.py

@@ -9,7 +9,17 @@ from pydantic import BaseModel, Field, field_validator
 BACKEND_ROOT = Path(__file__).resolve().parent.parent
 
 class SearchAPI(Enum):
-    """搜索 API 提供商(仅支持混合搜索:Tavily + SerpApi)。"""
+    """搜索 API 提供商的枚举。
+
+    兼容旧测试和示例:
+    - TAVILY: 使用 Tavily 搜索后端
+    - SERPAPI: 使用 SerpApi
+    - DDG: DuckDuckGo (内置 ddgs)
+    - HYBRID: 混合策略(Tavily + SerpApi),为默认值
+    """
+    TAVILY = "tavily"
+    SERPAPI = "serpapi"
+    DDG = "ddg"
     HYBRID = "hybrid"
 
 
@@ -147,30 +157,6 @@ class Configuration(BaseModel):
             if env_key in os.environ:
                 raw_values[field_name] = os.environ[env_key]
 
-        # 显式环境名称的额外映射
-        env_aliases = {
-            "llm_provider": os.getenv("LLM_PROVIDER"),
-            "llm_api_key": os.getenv("LLM_API_KEY"),
-            "llm_model_id": os.getenv("LLM_MODEL_ID"),
-            "smart_llm_model": os.getenv("SMART_LLM_MODEL"),
-            "fast_llm_model": os.getenv("FAST_LLM_MODEL"),
-            "llm_base_url": os.getenv("LLM_BASE_URL"),
-            "max_web_research_loops": os.getenv("MAX_WEB_RESEARCH_LOOPS"),
-            "fetch_full_page": os.getenv("FETCH_FULL_PAGE"),
-            "strip_thinking_tokens": os.getenv("STRIP_THINKING_TOKENS"),
-            "use_tool_calling": os.getenv("USE_TOOL_CALLING"),
-            "search_api": os.getenv("SEARCH_API"),
-            "enable_notes": os.getenv("ENABLE_NOTES"),
-            "notes_workspace": os.getenv("NOTES_WORKSPACE"),
-            "tts_api_key": os.getenv("TTS_API_KEY"),
-            "tts_base_url": os.getenv("TTS_BASE_URL"),
-            "tts_model": os.getenv("TTS_MODEL"),
-            "audio_output_dir": os.getenv("AUDIO_OUTPUT_DIR"),
-            "ffmpeg_path": os.getenv("FFMPEG_PATH"),
-            "tavily_api_key": os.getenv("TAVILY_API_KEY"),
-            "serpapi_api_key": os.getenv("SERPAPI_API_KEY"),
-        }
-
         # 处理 NO_PROXY
         no_proxy = os.getenv("NO_PROXY")
         if no_proxy:
@@ -178,10 +164,6 @@ class Configuration(BaseModel):
             # 同时设置为小写以兼容
             os.environ["no_proxy"] = no_proxy
 
-        for key, value in env_aliases.items():
-            if value is not None:
-                raw_values.setdefault(key, value)
-
         if overrides:
             for key, value in overrides.items():
                 if value is not None:

+ 81 - 54
Co-creation-projects/JJason-DeepCastAgent/backend/src/main.py

@@ -75,6 +75,9 @@ def create_app() -> FastAPI:
     """创建并配置 FastAPI 应用实例。"""
     app = FastAPI(title="DeepCast - 自动播客生成智能体")
 
+    # 当前活跃的研究 agent 引用,用于支持取消操作
+    _active_agent: dict[str, DeepResearchAgent | None] = {"current": None}
+
     app.add_middleware(
         CORSMiddleware,
         allow_origins=["*"],
@@ -182,6 +185,20 @@ def create_app() -> FastAPI:
             podcast_script=podcast_resp,
         )
 
+    @app.post("/research/cancel")
+    async def cancel_research() -> dict[str, str]:
+        """
+        主动取消当前正在执行的研究任务。
+        
+        前端可以通过此端点显式通知后端停止处理。
+        """
+        agent = _active_agent.get("current")
+        if agent and not agent.is_cancelled():
+            logger.info("Cancel requested via /research/cancel endpoint")
+            agent.cancel()
+            return {"status": "cancelled", "message": "取消请求已发送"}
+        return {"status": "no_task", "message": "当前没有正在运行的任务"}
+
     @app.post("/research/stream")
     async def stream_research(payload: ResearchRequest, request: Request) -> StreamingResponse:
         """
@@ -193,66 +210,76 @@ def create_app() -> FastAPI:
         try:
             config = _build_config(payload)
             agent = DeepResearchAgent(config=config)
+            _active_agent["current"] = agent  # 注册活跃 agent 以支持取消
         except ValueError as exc:
             raise HTTPException(status_code=400, detail=str(exc)) from exc
 
         async def event_iterator():
+            import concurrent.futures
+
+            loop = asyncio.get_event_loop()
+            # 用 asyncio.Queue 桥接同步生成器和异步循环
+            # 生成器在单一后台线程中完整运行,避免并发调用 next() 破坏生成器状态
+            event_queue: asyncio.Queue = asyncio.Queue()
+            _SENTINEL = object()  # 生成器结束的哨兵值
+
+            def run_generator():
+                """在后台线程中完整运行生成器,将事件逐一推入异步队列。"""
+                try:
+                    for event in agent.run_stream(payload.topic):
+                        loop.call_soon_threadsafe(event_queue.put_nowait, event)
+                except Exception as exc:
+                    logger.exception("Generator raised exception")
+                    loop.call_soon_threadsafe(
+                        event_queue.put_nowait,
+                        {"type": "error", "detail": str(exc)},
+                    )
+                finally:
+                    loop.call_soon_threadsafe(event_queue.put_nowait, _SENTINEL)
+
+            # 启动断开连接监控任务
+            async def monitor_disconnect():
+                while True:
+                    if await request.is_disconnected():
+                        logger.info("Client disconnected detected by monitor")
+                        agent.cancel()
+                        return
+                    await asyncio.sleep(0.5)
+
+            monitor_task = asyncio.create_task(monitor_disconnect())
+            executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+            loop.run_in_executor(executor, run_generator)
+
             try:
-                # 在线程池中运行同步生成器
-                import concurrent.futures
-                with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
-                    # 将生成器转换为可在线程中运行的迭代
-                    gen = agent.run_stream(payload.topic)
-                    
-                    # 启动一个后台任务来监控连接状态
-                    async def monitor_disconnect():
-                        while True:
-                            if await request.is_disconnected():
-                                logger.info("Client disconnected detected by monitor")
-                                agent.cancel()
-                                return True
-                            await asyncio.sleep(0.5)
-                    
-                    monitor_task = asyncio.create_task(monitor_disconnect())
-                    
+                while True:
                     try:
-                        while True:
-                            # 检查是否已取消
-                            if agent.is_cancelled():
-                                logger.info("✅ 本次任务已取消")
-                                yield f'data: {{"type": "cancelled", "message": "研究任务已被用户取消"}}\\n\\n'
-                                break
-                            
-                            # 在线程池中获取下一个事件
-                            loop = asyncio.get_event_loop()
-                            try:
-                                event = await asyncio.wait_for(
-                                    loop.run_in_executor(executor, lambda: next(gen, None)),
-                                    timeout=0.5
-                                )
-                            except asyncio.TimeoutError:
-                                # 超时时继续检查连接状态
-                                continue
-                            
-                            if event is None:
-                                break
-                                
-                            yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
-                            
-                            # 如果是完成或取消事件,退出循环
-                            if event.get("type") in ("done", "cancelled", "error"):
-                                break
-                    finally:
-                        monitor_task.cancel()
-                        try:
-                            await monitor_task
-                        except asyncio.CancelledError:
-                            pass
-                            
-            except Exception as exc:  # pragma: no cover - defensive guardrail
-                logger.exception("Streaming research failed")
-                error_payload = {"type": "error", "detail": str(exc)}
-                yield f"data: {json.dumps(error_payload, ensure_ascii=False)}\n\n"
+                        # 带超时等待,以便能及时响应取消
+                        item = await asyncio.wait_for(event_queue.get(), timeout=1.0)
+                    except asyncio.TimeoutError:
+                        # 超时时检查是否已取消(用于客户端断开但生成器还未感知的情况)
+                        if agent.is_cancelled():
+                            logger.info("✅ 本次任务已取消(超时检测)")
+                            yield 'data: {"type": "cancelled", "message": "研究任务已被用户取消"}\n\n'
+                        continue
+
+                    # 哨兵:生成器已结束
+                    if item is _SENTINEL:
+                        break
+
+                    event = item
+                    yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
+
+                    if event.get("type") in ("done", "cancelled", "error"):
+                        break
+            finally:
+                monitor_task.cancel()
+                try:
+                    await monitor_task
+                except asyncio.CancelledError:
+                    pass
+                # 不等待后台线程(daemon),立即返回响应
+                executor.shutdown(wait=False)
+                _active_agent["current"] = None
 
         return StreamingResponse(
             event_iterator(),

+ 0 - 10
Co-creation-projects/JJason-DeepCastAgent/backend/src/models.py

@@ -41,16 +41,6 @@ class SummaryState:
     podcast_script: list | None = field(default=None)  # 播客脚本(JSON 字符串)
 
 
-@dataclass(kw_only=True)
-class SummaryStateInput:
-    """深度研究工作流的输入状态模型。
-    
-    用于指定研究主题。
-    """
-
-    research_topic: str | None = field(default=None)  # 研究主题
-
-
 @dataclass(kw_only=True)
 class SummaryStateOutput:
     """深度研究工作流的输出状态模型。

+ 25 - 10
Co-creation-projects/JJason-DeepCastAgent/backend/src/services/audio_generator.py

@@ -5,6 +5,7 @@ from __future__ import annotations
 import logging
 from collections.abc import Callable
 from pathlib import Path
+from threading import Event
 
 import requests
 
@@ -44,7 +45,8 @@ class AudioGenerationService:
         self, 
         script: list[dict[str, str]], 
         task_id: str = "default",
-        progress_callback: Callable[[int, int, str, str], bool | None] | None = None
+        progress_callback: Callable[[int, int, str, str], bool | None] | None = None,
+        cancel_event: Event | None = None,
     ) -> list[str]:
         """
         为给定的脚本生成音频文件。
@@ -54,6 +56,7 @@ class AudioGenerationService:
             task_id: 当前任务/会话的唯一标识符
             progress_callback: 可选的进度回调函数,签名为 (current, total, role, content_preview) -> Optional[bool]
                               返回 False 表示应该停止生成,返回 True 或 None 表示继续
+            cancel_event: 可选的取消事件,set 时立即停止生成
             
         Returns:
             生成的音频文件的路径列表
@@ -75,14 +78,11 @@ class AudioGenerationService:
             
             if not role or not content:
                 continue
-            
-            # 调用进度回调,检查是否应该停止
-            if progress_callback:
-                content_preview = content[:30] + "..." if len(content) > 30 else content
-                should_continue = progress_callback(index + 1, total, role, content_preview)
-                if should_continue is False:
-                    logger.info("Audio generation cancelled by callback")
-                    break
+
+            # 直接检查取消事件(最可靠的方式)
+            if cancel_event and cancel_event.is_set():
+                logger.info("Audio generation cancelled before TTS %d/%d (cancel_event)", index + 1, total)
+                break
                 
             voice_id = self._get_voice_for_role(role)
             if not voice_id:
@@ -97,6 +97,19 @@ class AudioGenerationService:
             if self._call_tts_api(content, voice_id, file_path):
                 generated_files.append(str(file_path))
                 logger.info("[TTS %d/%d] ✓ %s 语音生成成功", index + 1, total, role)
+                
+                # TTS 完成后再次检查取消
+                if cancel_event and cancel_event.is_set():
+                    logger.info("Audio generation cancelled after TTS %d/%d (cancel_event)", index + 1, total)
+                    break
+                
+                # 在 TTS 成功之后才调用进度回调,通知上层该片段已完成
+                if progress_callback:
+                    content_preview = content[:30] + "..." if len(content) > 30 else content
+                    should_continue = progress_callback(index + 1, total, role, content_preview)
+                    if should_continue is False:
+                        logger.info("Audio generation cancelled by callback after TTS %d/%d", index + 1, total)
+                        break
             else:
                 logger.error("[TTS %d/%d] ✗ %s 语音生成失败", index + 1, total, role)
                 
@@ -150,11 +163,13 @@ class AudioGenerationService:
         
         try:
             logger.debug("Calling TTS API for voice %s: %s...", voice, text[:20])
+            # Use configurable timeout if available; default to 300 seconds for robustness.
+            timeout = getattr(self._config, "tts_timeout", 300)
             response = requests.post(
                 self._config.tts_base_url,
                 json=payload,
                 headers=headers,
-                timeout=300
+                timeout=timeout
             )
             
             if response.status_code == 200:

+ 7 - 1
Co-creation-projects/JJason-DeepCastAgent/backend/src/services/audio_synthesizer.py

@@ -33,13 +33,14 @@ class PodcastSynthesisService:
         # 确保 pydub/ffmpeg 可用 - 假设 ffmpeg 已安装在系统中
         # 如果没有,pydub 可能会发出警告或失败,但我们会捕获异常。
 
-    def synthesize_podcast(self, audio_files: list[str], task_id: str = "default") -> str | None:
+    def synthesize_podcast(self, audio_files: list[str], task_id: str = "default", cancel_check: callable = None) -> str | None:
         """
         将音频文件组合成单个播客 MP3。
 
         Args:
             audio_files: 按顺序排列的输入音频文件路径列表。
             task_id: 输出文件名的唯一标识符。
+            cancel_check: 可选的取消检查回调,返回 True 表示已取消。
 
         Returns:
             最终播客文件的路径,如果失败则为 None。
@@ -56,6 +57,11 @@ class PodcastSynthesisService:
 
             valid_segments_count = 0
             for file_path in audio_files:
+                # 检查是否已取消
+                if cancel_check and cancel_check():
+                    logger.info("Podcast synthesis cancelled.")
+                    return None
+                    
                 path = Path(file_path)
                 if not path.exists():
                     logger.warning("Audio file not found: %s", file_path)

+ 16 - 4
Co-creation-projects/JJason-DeepCastAgent/backend/src/services/script_generator.py

@@ -39,11 +39,23 @@ SCRIPT_JSON_SCHEMA = {
 class ScriptGenerationService:
     """从研究报告生成对话脚本(使用结构化输出)。"""
 
-    def __init__(self, script_agent, config: Configuration) -> None:
-        """初始化服务。"""
+    def __init__(
+        self,
+        config: Configuration,
+        script_agent: OpenAI | None = None,
+    ) -> None:
+        """
+        初始化服务。
+
+        Args:
+            config: 全局配置对象。
+            script_agent: 可选的自定义脚本生成客户端/代理。
+                如果提供,将直接使用该客户端;否则将基于配置创建默认的 OpenAI 客户端。
+        """
         self._config = config
-        # 直接使用 OpenAI 客户端以支持结构化输出
-        self._client = OpenAI(
+        # 优先使用注入的自定义客户端,以保持向后兼容和可测试性;
+        # 如果未提供,则基于配置创建默认的 OpenAI 客户端以支持结构化输出。
+        self._client = script_agent or OpenAI(
             api_key=config.llm_api_key,
             base_url=config.llm_base_url,
         )

+ 3 - 3
Co-creation-projects/JJason-DeepCastAgent/backend/src/services/tool_events.py

@@ -130,14 +130,14 @@ class ToolCallTracker:
 
     def reset(self) -> None:
         """
-        清除记录的工具调用事件。
+        重置当前已记录的工具调用事件。
         
-        此方法是线程安全的,确保在多线程环境中安全调用。
+        该方法会清空内部事件列表并重置游标,用于在同一
+        Tracker 实例上复用时避免跨任务/会话的事件泄漏。
         """
         with self._lock:
             self._events.clear()
             self._cursor = 0
-
     def as_dicts(self) -> list[dict[str, Any]]:
         """
         暴露原始事件的快照以实现向后兼容性。

+ 8 - 2
Co-creation-projects/JJason-DeepCastAgent/frontend/src/App.vue

@@ -257,7 +257,7 @@
 
 <script lang="ts" setup>
 import { reactive, ref, nextTick } from "vue";
-import { runResearchStream, type ResearchStreamEvent } from "./services/api";
+import { runResearchStream, cancelResearch, type ResearchStreamEvent } from "./services/api";
 import MarkdownIt from "markdown-it";
 
 // Markdown renderer
@@ -431,6 +431,9 @@ function handleStreamEvent(event: ResearchStreamEvent) {
     if (stage === "report") productionStage.value = "research";
     else if (stage === "script") productionStage.value = "script";
     else if (stage === "audio") productionStage.value = "audio";
+    // Backend distinguishes a separate "synthesis" stage for final audio stitching,
+    // but the UI groups it under the overall "audio" production stage for simplicity.
+    else if (stage === "synthesis") productionStage.value = "audio";
   }
 
   // 3. Task / Tool Updates (Simplified logging)
@@ -530,10 +533,13 @@ function handleStreamEvent(event: ResearchStreamEvent) {
 function cancelProduction() {
   if (confirm("确定要取消制作吗?")) {
     addLog("🛑 用户请求取消制作...");
+    // 先通知后端停止,再断开 SSE 连接
+    cancelResearch().then(() => {
+      addLog("✅ 后端已接收取消请求");
+    });
     if (abortController) {
       abortController.abort();
       abortController = null;
-      addLog("✅ 已发送取消请求到后端");
     }
     stopWaitingAnimation();
     productionStage.value = "done";

+ 11 - 0
Co-creation-projects/JJason-DeepCastAgent/frontend/src/services/api.ts

@@ -14,6 +14,17 @@ export interface StreamOptions {
   signal?: AbortSignal;
 }
 
+/**
+ * 主动取消后端正在执行的研究任务。
+ */
+export async function cancelResearch(): Promise<void> {
+  try {
+    await fetch(`${baseURL}/research/cancel`, { method: "POST" });
+  } catch (err) {
+    console.warn("Failed to send cancel request:", err);
+  }
+}
+
 export async function runResearchStream(
   payload: ResearchRequest,
   onEvent: (event: ResearchStreamEvent) => void,