writing_service.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. """
  2. 写作服务
  3. """
  4. from typing import Optional, List, Dict, Any
  5. from sqlalchemy.orm import Session
  6. from ..core.database import get_db
  7. from ..models.writing import WritingDB, Writing, WritingCreate, WritingUpdate
  8. from ..core.exceptions import WritingNotFoundError
  9. from ..services.paper_service import PaperService
  10. from ..utils.citation_formatter import CitationFormatter
  11. import json
  12. import re
  13. class WritingService:
  14. """写作服务类"""
  15. def __init__(self, db: Session):
  16. self.db = db
  17. self.paper_service = PaperService(db)
  18. self.citation_formatter = CitationFormatter()
  19. def get_writing_by_id(self, writing_id: int) -> Optional[Writing]:
  20. """根据ID获取写作"""
  21. writing_db = self.db.query(WritingDB).filter(WritingDB.id == writing_id).first()
  22. if not writing_db:
  23. raise WritingNotFoundError(f"Writing with id {writing_id} not found")
  24. return Writing.from_orm(writing_db)
  25. def get_writings_by_user(self, user_id: int, skip: int = 0, limit: int = 20) -> List[Writing]:
  26. """获取用户的写作列表"""
  27. writings_db = self.db.query(WritingDB).filter(
  28. WritingDB.user_id == user_id
  29. ).order_by(WritingDB.created_at.desc()).offset(skip).limit(limit).all()
  30. return [Writing.from_orm(writing) for writing in writings_db]
  31. def create_writing(self, writing_create: WritingCreate, user_id: int, task_id: Optional[int] = None) -> Writing:
  32. """创建写作"""
  33. # 计算字数
  34. content = writing_create.content or ""
  35. word_count = len(re.findall(r'\S+', content))
  36. writing_db = WritingDB(
  37. title=writing_create.title,
  38. writing_type=writing_create.writing_type,
  39. content=content,
  40. outline=json.dumps(writing_create.outline or []),
  41. paper_ids=json.dumps(writing_create.paper_ids),
  42. word_count=word_count,
  43. user_id=user_id,
  44. task_id=task_id
  45. )
  46. self.db.add(writing_db)
  47. self.db.commit()
  48. self.db.refresh(writing_db)
  49. return Writing.from_orm(writing_db)
  50. def update_writing(self, writing_id: int, writing_update: WritingUpdate) -> Writing:
  51. """更新写作"""
  52. writing_db = self.db.query(WritingDB).filter(WritingDB.id == writing_id).first()
  53. if not writing_db:
  54. raise WritingNotFoundError(f"Writing with id {writing_id} not found")
  55. # 更新字段
  56. update_data = writing_update.dict(exclude_unset=True)
  57. for field, value in update_data.items():
  58. if field in ['outline', 'sections', 'citations', 'paper_ids']:
  59. setattr(writing_db, field, json.dumps(value))
  60. else:
  61. setattr(writing_db, field, value)
  62. # 重新计算字数
  63. if 'content' in update_data:
  64. writing_db.word_count = len(re.findall(r'\S+', writing_db.content or ""))
  65. self.db.commit()
  66. self.db.refresh(writing_db)
  67. return Writing.from_orm(writing_db)
  68. def delete_writing(self, writing_id: int) -> bool:
  69. """删除写作"""
  70. writing_db = self.db.query(WritingDB).filter(WritingDB.id == writing_id).first()
  71. if not writing_db:
  72. raise WritingNotFoundError(f"Writing with id {writing_id} not found")
  73. self.db.delete(writing_db)
  74. self.db.commit()
  75. return True
  76. def get_writing_statistics(self, user_id: int) -> Dict[str, Any]:
  77. """获取写作统计信息"""
  78. total_writings = self.db.query(WritingDB).filter(WritingDB.user_id == user_id).count()
  79. # 按类型统计
  80. type_stats = self.db.query(
  81. WritingDB.writing_type,
  82. self.db.func.count(WritingDB.id)
  83. ).filter(WritingDB.user_id == user_id).group_by(WritingDB.writing_type).all()
  84. # 按状态统计
  85. status_stats = self.db.query(
  86. WritingDB.status,
  87. self.db.func.count(WritingDB.id)
  88. ).filter(WritingDB.user_id == user_id).group_by(WritingDB.status).all()
  89. # 总字数
  90. total_words = self.db.query(
  91. self.db.func.sum(WritingDB.word_count)
  92. ).filter(WritingDB.user_id == user_id).scalar() or 0
  93. # 平均质量分数
  94. avg_quality = self.db.query(
  95. self.db.func.avg(WritingDB.quality_score)
  96. ).filter(WritingDB.user_id == user_id).scalar() or 0
  97. return {
  98. 'total_writings': total_writings,
  99. 'total_words': int(total_words),
  100. 'average_quality': float(avg_quality),
  101. 'type_distribution': dict(type_stats),
  102. 'status_distribution': dict(status_stats)
  103. }
  104. def format_citations(self, writing_id: int, style: str = "APA") -> Dict[str, Any]:
  105. """格式化引用"""
  106. writing = self.get_writing_by_id(writing_id)
  107. # 获取参考论文
  108. paper_ids = json.loads(writing.paper_ids or "[]")
  109. papers = []
  110. for paper_id in paper_ids:
  111. try:
  112. paper = self.paper_service.get_paper_by_id(paper_id)
  113. papers.append(paper)
  114. except Exception:
  115. continue
  116. # 格式化引用
  117. formatted_citations = self.citation_formatter.format_papers(papers, style)
  118. # 更新写作中的引用
  119. citations_data = [
  120. {
  121. 'id': paper.id,
  122. 'title': paper.title,
  123. 'authors': paper.authors,
  124. 'journal': paper.journal,
  125. 'year': paper.publication_year,
  126. 'formatted': formatted_citations.get(str(paper.id), "")
  127. }
  128. for paper in papers
  129. ]
  130. self.update_writing(writing_id, WritingUpdate(citations=citations_data))
  131. return {
  132. 'citations': citations_data,
  133. 'bibliography': self.citation_formatter.generate_bibliography(papers, style)
  134. }
  135. def export_writing(self, writing_id: int, format: str = "markdown", include_citations: bool = True) -> Dict[str, Any]:
  136. """导出写作"""
  137. writing = self.get_writing_by_id(writing_id)
  138. if format == "markdown":
  139. return self._export_to_markdown(writing, include_citations)
  140. elif format == "latex":
  141. return self._export_to_latex(writing, include_citations)
  142. elif format == "docx":
  143. return self._export_to_docx(writing, include_citations)
  144. elif format == "pdf":
  145. return self._export_to_pdf(writing, include_citations)
  146. else:
  147. raise ValueError(f"Unsupported export format: {format}")
  148. def _export_to_markdown(self, writing: Writing, include_citations: bool = True) -> str:
  149. """导出为Markdown格式"""
  150. markdown = f"# {writing.title}\n\n"
  151. markdown += f"**类型**: {writing.writing_type}\n\n"
  152. markdown += f"**字数**: {writing.word_count}\n\n"
  153. markdown += f"**创建时间**: {writing.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
  154. if writing.content:
  155. markdown += f"{writing.content}\n\n"
  156. if include_citations and writing.citations:
  157. markdown += "## 参考文献\n\n"
  158. for citation in writing.citations:
  159. markdown += f"- {citation.get('formatted', '')}\n"
  160. return markdown
  161. def _export_to_latex(self, writing: Writing, include_citations: bool = True) -> str:
  162. """导出为LaTeX格式"""
  163. latex = "\\documentclass{article}\n"
  164. latex += "\\usepackage[utf8]{inputenc}\n"
  165. latex += "\\usepackage{cite}\n\n"
  166. latex += "\\begin{document}\n\n"
  167. latex += f"\\title{{{writing.title}}}\n"
  168. latex += "\\maketitle\n\n"
  169. if writing.content:
  170. # 简单的Markdown到LaTeX转换
  171. content = writing.content.replace("**", "\\textbf{").replace("**", "}")
  172. content = content.replace("*", "\\textit{").replace("*", "}")
  173. latex += f"{content}\n\n"
  174. if include_citations and writing.citations:
  175. latex += "\\bibliographystyle{plain}\n"
  176. latex += "\\bibliography{references}\n"
  177. latex += "\\end{document}"
  178. return latex
  179. def _export_to_docx(self, writing: Writing, include_citations: bool = True) -> bytes:
  180. """导出为Word文档格式"""
  181. # 这里可以使用python-docx库
  182. # 暂时返回Markdown内容的字节
  183. content = self._export_to_markdown(writing, include_citations)
  184. return content.encode('utf-8')
  185. def _export_to_pdf(self, writing: Writing, include_citations: bool = True) -> bytes:
  186. """导出为PDF格式"""
  187. # 这里可以使用reportlab或其他PDF生成库
  188. # 暂时返回Markdown内容的字节
  189. content = self._export_to_markdown(writing, include_citations)
  190. return content.encode('utf-8')
  191. def generate_outline(self, topic: str, paper_ids: List[int], writing_type: str = "review") -> List[Dict[str, Any]]:
  192. """生成写作大纲"""
  193. # 获取参考论文
  194. papers = []
  195. for paper_id in paper_ids:
  196. try:
  197. paper = self.paper_service.get_paper_by_id(paper_id)
  198. papers.append(paper)
  199. except Exception:
  200. continue
  201. # 根据写作类型生成大纲模板
  202. if writing_type == "review":
  203. outline = [
  204. {"title": "引言", "level": 1, "content": "研究背景和意义"},
  205. {"title": "文献综述", "level": 1, "content": "相关研究工作总结"},
  206. {"title": "方法论分析", "level": 1, "content": "研究方法比较"},
  207. {"title": "主要发现", "level": 1, "content": "研究成果总结"},
  208. {"title": "讨论与展望", "level": 1, "content": "研究局限性和未来方向"},
  209. {"title": "结论", "level": 1, "content": "研究总结"}
  210. ]
  211. elif writing_type == "summary":
  212. outline = [
  213. {"title": "研究背景", "level": 1, "content": "研究动机和目标"},
  214. {"title": "研究方法", "level": 1, "content": "实验设计和数据分析"},
  215. {"title": "主要结果", "level": 1, "content": "关键发现和数据分析"},
  216. {"title": "结论与意义", "level": 1, "content": "研究贡献和影响"}
  217. ]
  218. elif writing_type == "critique":
  219. outline = [
  220. {"title": "论文概述", "level": 1, "content": "研究内容总结"},
  221. {"title": "优点分析", "level": 1, "content": "研究的创新性和贡献"},
  222. {"title": "问题与局限", "level": 1, "content": "研究中存在的问题"},
  223. {"title": "改进建议", "level": 1, "content": "对研究的改进意见"}
  224. ]
  225. else:
  226. outline = [
  227. {"title": "研究背景", "level": 1, "content": ""},
  228. {"title": "研究目标", "level": 1, "content": ""},
  229. {"title": "研究方法", "level": 1, "content": ""},
  230. {"title": "预期结果", "level": 1, "content": ""},
  231. {"title": "研究意义", "level": 1, "content": ""}
  232. ]
  233. return outline