| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207 |
- from typing import List, Dict, Optional, Any
- import os
- import hashlib
- import sqlite3
- import time
- import json
- from ..embedding import get_text_embedder, get_dimension
- from ..storage.qdrant_store import QdrantVectorStore
- def _get_markitdown_instance():
- """
- Get a configured MarkItDown instance for document conversion.
- """
- try:
- from markitdown import MarkItDown
- return MarkItDown()
- except ImportError:
- print("[WARNING] MarkItDown not available. Install with: pip install markitdown")
- return None
- def _is_markitdown_supported_format(path: str) -> bool:
- """
- Check if the file format is supported by MarkItDown.
- Supports: PDF, Office docs (docx, xlsx, pptx), images (jpg, png, gif, bmp, tiff),
- audio (mp3, wav, m4a), HTML, text formats (txt, md, csv, json, xml), ZIP files, etc.
- """
- ext = (os.path.splitext(path)[1] or '').lower()
- supported_formats = {
- # Documents
- '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx',
- # Text formats
- '.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm',
- # Images (OCR + metadata)
- '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp',
- # Audio (transcription + metadata)
- '.mp3', '.wav', '.m4a', '.aac', '.flac', '.ogg',
- # Archives
- '.zip', '.tar', '.gz', '.rar',
- # Code files
- '.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.css', '.scss',
- # Other text
- '.log', '.conf', '.ini', '.cfg', '.yaml', '.yml', '.toml'
- }
- return ext in supported_formats
- def _convert_to_markdown(path: str) -> str:
- """
- Universal document reader using MarkItDown with enhanced PDF processing.
- Converts any supported file format to markdown text.
- """
- if not os.path.exists(path):
- return ""
-
- # 对PDF文件使用增强处理
- ext = (os.path.splitext(path)[1] or '').lower()
- if ext == '.pdf':
- return _enhanced_pdf_processing(path)
-
- # 其他格式使用原有MarkItDown
- md_instance = _get_markitdown_instance()
- if md_instance is None:
- return _fallback_text_reader(path)
-
- try:
- result = md_instance.convert(path)
- text = getattr(result, "text_content", None)
- if isinstance(text, str) and text.strip():
- return text
- return ""
- except Exception as e:
- print(f"[WARNING] MarkItDown failed for {path}: {e}")
- return _fallback_text_reader(path)
- def _enhanced_pdf_processing(path: str) -> str:
- """
- Enhanced PDF processing with post-processing cleanup.
- """
- print(f"[RAG] Using enhanced PDF processing for: {path}")
-
- # 使用原有MarkItDown提取
- md_instance = _get_markitdown_instance()
- if md_instance is None:
- return _fallback_text_reader(path)
-
- try:
- result = md_instance.convert(path)
- raw_text = getattr(result, "text_content", None)
- if not raw_text or not raw_text.strip():
- return ""
-
- # 后处理:清理和重组文本
- cleaned_text = _post_process_pdf_text(raw_text)
- print(f"[RAG] PDF post-processing completed: {len(raw_text)} -> {len(cleaned_text)} chars")
- return cleaned_text
-
- except Exception as e:
- print(f"[WARNING] Enhanced PDF processing failed for {path}: {e}")
- return _fallback_text_reader(path)
- def _post_process_pdf_text(text: str) -> str:
- """
- Post-process PDF text to improve quality.
- """
- import re
-
- # 1. 按行分割并清理
- lines = text.splitlines()
- cleaned_lines = []
-
- for line in lines:
- line = line.strip()
- if not line:
- continue
-
- # 移除单个字符的行(通常是噪音)
- if len(line) <= 2 and not line.isdigit():
- continue
-
- # 移除明显的页眉页脚噪音
- if re.match(r'^\d+$', line): # 纯数字行(页码)
- continue
- if line.lower() in ['github', 'project', 'forks', 'stars', 'language']:
- continue
-
- cleaned_lines.append(line)
-
- # 2. 智能合并短行
- merged_lines = []
- i = 0
-
- while i < len(cleaned_lines):
- current_line = cleaned_lines[i]
-
- # 如果当前行很短,尝试与下一行合并
- if len(current_line) < 60 and i + 1 < len(cleaned_lines):
- next_line = cleaned_lines[i + 1]
-
- # 合并条件:都是内容,不是标题
- if (not current_line.endswith(':') and
- not current_line.endswith(':') and
- not current_line.startswith('#') and
- not next_line.startswith('#') and
- len(next_line) < 120):
-
- merged_line = current_line + " " + next_line
- merged_lines.append(merged_line)
- i += 2 # 跳过下一行
- continue
-
- merged_lines.append(current_line)
- i += 1
-
- # 3. 重新组织段落
- paragraphs = []
- current_paragraph = []
-
- for line in merged_lines:
- # 检查是否是新段落的开始
- if (line.startswith('#') or # 标题
- line.endswith(':') or # 中文冒号结尾
- line.endswith(':') or # 英文冒号结尾
- len(line) > 150 or # 长句通常是段落开始
- not current_paragraph): # 第一行
-
- # 保存当前段落
- if current_paragraph:
- paragraphs.append(' '.join(current_paragraph))
- current_paragraph = []
-
- paragraphs.append(line)
- else:
- current_paragraph.append(line)
-
- # 添加最后一个段落
- if current_paragraph:
- paragraphs.append(' '.join(current_paragraph))
-
- return '\n\n'.join(paragraphs)
- def _fallback_text_reader(path: str) -> str:
- """
- Simple fallback reader for basic text files when MarkItDown is unavailable.
- """
- try:
- with open(path, 'r', encoding='utf-8', errors='ignore') as f:
- return f.read()
- except Exception:
- try:
- with open(path, 'r', encoding='latin-1', errors='ignore') as f:
- return f.read()
- except Exception:
- return ""
- def _detect_lang(sample: str) -> str:
- try:
- from langdetect import detect
- return detect(sample[:1000]) if sample else "unknown"
- except Exception:
- return "unknown"
- def _is_cjk(ch: str) -> bool:
- code = ord(ch)
- return (
- 0x4E00 <= code <= 0x9FFF or
- 0x3400 <= code <= 0x4DBF or
- 0x20000 <= code <= 0x2A6DF or
- 0x2A700 <= code <= 0x2B73F or
- 0x2B740 <= code <= 0x2B81F or
- 0x2B820 <= code <= 0x2CEAF or
- 0xF900 <= code <= 0xFAFF
- )
- def _approx_token_len(text: str) -> int:
- # 近似估计:CJK字符按1 token,其他按空白分词
- cjk = sum(1 for ch in text if _is_cjk(ch))
- non_cjk_tokens = len([t for t in text.split() if t])
- return cjk + non_cjk_tokens
- def _split_paragraphs_with_headings(text: str) -> List[Dict]:
- lines = text.splitlines()
- heading_stack: List[str] = []
- paragraphs: List[Dict] = []
- buf: List[str] = []
- char_pos = 0
- def flush_buf(end_pos: int):
- if not buf:
- return
- content = "\n".join(buf).strip()
- if not content:
- return
- paragraphs.append({
- "content": content,
- "heading_path": " > ".join(heading_stack) if heading_stack else None,
- "start": max(0, end_pos - len(content)),
- "end": end_pos,
- })
- for ln in lines:
- raw = ln
- if raw.strip().startswith("#"):
- # heading line
- flush_buf(char_pos)
- level = len(raw) - len(raw.lstrip('#'))
- title = raw.lstrip('#').strip()
- if level <= 0:
- level = 1
- if level <= len(heading_stack):
- heading_stack = heading_stack[:level-1]
- heading_stack.append(title)
- char_pos += len(raw) + 1
- continue
- # paragraph accumulation
- if raw.strip() == "":
- flush_buf(char_pos)
- buf = []
- else:
- buf.append(raw)
- char_pos += len(raw) + 1
- flush_buf(char_pos)
- if not paragraphs:
- paragraphs = [{"content": text, "heading_path": None, "start": 0, "end": len(text)}]
- return paragraphs
- def _chunk_paragraphs(paragraphs: List[Dict], chunk_tokens: int, overlap_tokens: int) -> List[Dict]:
- chunks: List[Dict] = []
- cur: List[Dict] = []
- cur_tokens = 0
- i = 0
- while i < len(paragraphs):
- p = paragraphs[i]
- p_tokens = _approx_token_len(p["content"]) or 1
- if cur_tokens + p_tokens <= chunk_tokens or not cur:
- cur.append(p)
- cur_tokens += p_tokens
- i += 1
- else:
- # emit current chunk
- content = "\n\n".join(x["content"] for x in cur)
- start = cur[0]["start"]
- end = cur[-1]["end"]
- heading_path = next((x["heading_path"] for x in reversed(cur) if x.get("heading_path")), None)
- chunks.append({
- "content": content,
- "start": start,
- "end": end,
- "heading_path": heading_path,
- })
- # build overlap by keeping tail tokens
- if overlap_tokens > 0 and cur:
- kept: List[Dict] = []
- kept_tokens = 0
- for x in reversed(cur):
- t = _approx_token_len(x["content"]) or 1
- if kept_tokens + t > overlap_tokens:
- break
- kept.append(x)
- kept_tokens += t
- cur = list(reversed(kept))
- cur_tokens = kept_tokens
- else:
- cur = []
- cur_tokens = 0
- if cur:
- content = "\n\n".join(x["content"] for x in cur)
- start = cur[0]["start"]
- end = cur[-1]["end"]
- heading_path = next((x["heading_path"] for x in reversed(cur) if x.get("heading_path")), None)
- chunks.append({
- "content": content,
- "start": start,
- "end": end,
- "heading_path": heading_path,
- })
- return chunks
- def load_and_chunk_texts(paths: List[str], chunk_size: int = 800, chunk_overlap: int = 100, namespace: Optional[str] = None, source_label: str = "rag") -> List[Dict]:
- """
- Universal document loader and chunker using MarkItDown.
- Converts all supported formats to markdown, then chunks intelligently.
- """
- print(f"[RAG] Universal loader start: files={len(paths)} chunk_size={chunk_size} overlap={chunk_overlap} ns={namespace or 'default'}")
- chunks: List[Dict] = []
- seen_hashes = set()
-
- for path in paths:
- if not os.path.exists(path):
- print(f"[WARNING] File not found: {path}")
- continue
-
- print(f"[RAG] Processing: {path}")
- ext = (os.path.splitext(path)[1] or '').lower()
-
- # Convert to markdown using MarkItDown
- markdown_text = _convert_to_markdown(path)
- if not markdown_text.strip():
- print(f"[WARNING] No content extracted from: {path}")
- continue
-
- lang = _detect_lang(markdown_text)
- doc_id = hashlib.md5(f"{path}|{len(markdown_text)}".encode('utf-8')).hexdigest()
-
- # Always use markdown-aware chunking for better structure preservation
- para = _split_paragraphs_with_headings(markdown_text)
- token_chunks = _chunk_paragraphs(para, chunk_tokens=max(1, chunk_size), overlap_tokens=max(0, chunk_overlap))
-
- for ch in token_chunks:
- content = ch["content"]
- start = ch.get("start", 0)
- end = ch.get("end", start + len(content))
- norm = content.strip()
- if not norm:
- continue
-
- content_hash = hashlib.md5(norm.encode('utf-8')).hexdigest()
- if content_hash in seen_hashes:
- continue
- seen_hashes.add(content_hash)
-
- chunk_id = hashlib.md5(f"{doc_id}|{start}|{end}|{content_hash}".encode('utf-8')).hexdigest()
- chunks.append({
- "id": chunk_id,
- "content": content,
- "metadata": {
- "source_path": path,
- "file_ext": ext,
- "doc_id": doc_id,
- "lang": lang,
- "start": start,
- "end": end,
- "content_hash": content_hash,
- "namespace": namespace or "default",
- "source": source_label,
- "external": True,
- "heading_path": ch.get("heading_path"),
- "format": "markdown", # Mark all content as markdown-processed
- },
- })
-
- print(f"[RAG] Universal loader done: total_chunks={len(chunks)}")
- return chunks
- def build_graph_from_chunks(neo4j, chunks: List[Dict]) -> None:
- created_docs = set()
- for ch in chunks:
- mem_id = ch["id"]
- meta = ch.get("metadata", {})
- source_path = meta.get("source_path")
- doc_id = meta.get("doc_id")
- if doc_id and doc_id not in created_docs:
- created_docs.add(doc_id)
- try:
- neo4j.add_entity(
- entity_id=doc_id,
- name=os.path.basename(source_path or doc_id),
- entity_type="Document",
- properties={"source_path": source_path, "lang": meta.get("lang")}
- )
- except Exception:
- pass
- try:
- neo4j.add_entity(entity_id=mem_id, name=mem_id, entity_type="Memory", properties={
- "source_path": source_path,
- "doc_id": doc_id,
- "start": meta.get("start"),
- "end": meta.get("end"),
- })
- except Exception:
- pass
- if doc_id:
- try:
- neo4j.add_relationship(from_id=doc_id, to_id=mem_id, rel_type="HAS_CHUNK", properties={})
- except Exception:
- pass
- def _preprocess_markdown_for_embedding(text: str) -> str:
- """
- Preprocess markdown text for better embedding quality.
- Removes excessive markup while preserving semantic content.
- """
- import re
-
- # Remove markdown headers symbols but keep the text
- text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
-
- # Remove markdown links but keep the text
- text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
-
- # Remove markdown emphasis markers
- text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # bold
- text = re.sub(r'\*([^*]+)\*', r'\1', text) # italic
- text = re.sub(r'`([^`]+)`', r'\1', text) # inline code
-
- # Remove markdown code blocks but keep content
- text = re.sub(r'```[^\n]*\n([\s\S]*?)```', r'\1', text)
-
- # Remove excessive whitespace
- text = re.sub(r'\n\s*\n', '\n\n', text)
- text = re.sub(r'[ \t]+', ' ', text)
-
- return text.strip()
- def _create_default_vector_store(dimension: int = None) -> QdrantVectorStore:
- """
- Create default Qdrant vector store with RAG-optimized settings.
- 使用连接管理器避免重复连接。
- """
- if dimension is None:
- dimension = get_dimension(384)
-
- # Check for Qdrant configuration
- qdrant_url = os.getenv("QDRANT_URL")
- qdrant_api_key = os.getenv("QDRANT_API_KEY")
-
- # 使用连接管理器
- from ..storage.qdrant_store import QdrantConnectionManager
- return QdrantConnectionManager.get_instance(
- url=qdrant_url,
- api_key=qdrant_api_key,
- collection_name="hello_agents_rag_vectors",
- vector_size=dimension,
- distance="cosine"
- )
- # Cache functions removed - using unified embedder with internal caching
- def index_chunks(
- store = None,
- chunks: List[Dict] = None,
- cache_db: Optional[str] = None,
- batch_size: int = 64,
- rag_namespace: str = "default"
- ) -> None:
- """
- Index markdown chunks with unified embedding and Qdrant storage.
- Uses百炼 API with fallback to sentence-transformers.
- """
- if not chunks:
- print("[RAG] No chunks to index")
- return
-
- # Use unified embedding from embedding module
- embedder = get_text_embedder()
- dimension = get_dimension(384)
-
- # Create default Qdrant store if not provided
- if store is None:
- store = _create_default_vector_store(dimension)
- print(f"[RAG] Created default Qdrant store with dimension {dimension}")
-
- # Preprocess markdown texts for better embeddings
- processed_texts = []
- for c in chunks:
- raw_content = c["content"]
- processed_content = _preprocess_markdown_for_embedding(raw_content)
- processed_texts.append(processed_content)
-
- print(f"[RAG] Embedding start: total_texts={len(processed_texts)} batch_size={batch_size}")
-
- # Batch encoding with unified embedder
- vecs: List[List[float]] = []
- for i in range(0, len(processed_texts), batch_size):
- part = processed_texts[i:i+batch_size]
- try:
- # Use unified embedder directly (handles caching internally)
- part_vecs = embedder.encode(part)
-
- # Normalize to List[List[float]]
- if not isinstance(part_vecs, list):
- # 单个numpy数组转为列表中的列表
- if hasattr(part_vecs, "tolist"):
- part_vecs = [part_vecs.tolist()]
- else:
- part_vecs = [list(part_vecs)]
- else:
- # 检查是否是嵌套列表
- if part_vecs and not isinstance(part_vecs[0], (list, tuple)) and hasattr(part_vecs[0], "__len__"):
- # numpy数组列表 -> 转换每个数组
- normalized_vecs = []
- for v in part_vecs:
- if hasattr(v, "tolist"):
- normalized_vecs.append(v.tolist())
- else:
- normalized_vecs.append(list(v))
- part_vecs = normalized_vecs
- elif part_vecs and not isinstance(part_vecs[0], (list, tuple)):
- # 单个向量被误判为列表,实际应该包装成[[...]]
- if hasattr(part_vecs, "tolist"):
- part_vecs = [part_vecs.tolist()]
- else:
- part_vecs = [list(part_vecs)]
-
- for v in part_vecs:
- try:
- # 确保向量是float列表
- if hasattr(v, "tolist"):
- v = v.tolist()
- v_norm = [float(x) for x in v]
- if len(v_norm) != dimension:
- print(f"[WARNING] 向量维度异常: 期望{dimension}, 实际{len(v_norm)}")
- # 用零向量填充或截断
- if len(v_norm) < dimension:
- v_norm.extend([0.0] * (dimension - len(v_norm)))
- else:
- v_norm = v_norm[:dimension]
- vecs.append(v_norm)
- except Exception as e:
- print(f"[WARNING] 向量转换失败: {e}, 使用零向量")
- vecs.append([0.0] * dimension)
-
- except Exception as e:
- print(f"[WARNING] Batch {i} encoding failed: {e}")
- print(f"[RAG] Retrying batch {i} with smaller chunks...")
-
- # 尝试重试:将批次分解为更小的块
- success = False
- for j in range(0, len(part), 8): # 更小的批次
- small_part = part[j:j+8]
- try:
- import time
- time.sleep(2) # 等待2秒避免频率限制
-
- small_vecs = embedder.encode(small_part)
- # Normalize to List[List[float]]
- if isinstance(small_vecs, list) and small_vecs and not isinstance(small_vecs[0], list):
- small_vecs = [small_vecs]
-
- for v in small_vecs:
- if hasattr(v, "tolist"):
- v = v.tolist()
- try:
- v_norm = [float(x) for x in v]
- if len(v_norm) != dimension:
- print(f"[WARNING] 向量维度异常: 期望{dimension}, 实际{len(v_norm)}")
- if len(v_norm) < dimension:
- v_norm.extend([0.0] * (dimension - len(v_norm)))
- else:
- v_norm = v_norm[:dimension]
- vecs.append(v_norm)
- success = True
- except Exception as e2:
- print(f"[WARNING] 小批次向量转换失败: {e2}")
- vecs.append([0.0] * dimension)
- except Exception as e2:
- print(f"[WARNING] 小批次 {j//8} 仍然失败: {e2}")
- # 为这个小批次创建零向量
- for _ in range(len(small_part)):
- vecs.append([0.0] * dimension)
-
- if not success:
- print(f"[ERROR] 批次 {i} 完全失败,使用零向量")
-
- print(f"[RAG] Embedding progress: {min(i+batch_size, len(processed_texts))}/{len(processed_texts)}")
-
- # Prepare metadata with RAG tags
- metas: List[Dict] = []
- ids: List[str] = []
- for ch in chunks:
- meta = {
- "memory_id": ch["id"],
- "user_id": "rag_user",
- "memory_type": "rag_chunk",
- "content": ch["content"], # Keep original markdown content
- "data_source": "rag_pipeline", # RAG identification tag
- "rag_namespace": rag_namespace,
- "is_rag_data": True, # Clear RAG data marker
- }
- # Merge chunk metadata
- meta.update(ch.get("metadata", {}))
- metas.append(meta)
- ids.append(ch["id"])
-
- print(f"[RAG] Qdrant upsert start: n={len(vecs)}")
- success = store.add_vectors(vectors=vecs, metadata=metas, ids=ids)
- if success:
- print(f"[RAG] Qdrant upsert done: {len(vecs)} vectors indexed")
- else:
- print(f"[RAG] Qdrant upsert failed")
- raise RuntimeError("Failed to index vectors to Qdrant")
- def embed_query(query: str) -> List[float]:
- """
- Embed query using unified embedding (百炼 with fallback).
- """
- embedder = get_text_embedder()
- dimension = get_dimension(384)
- try:
- vec = embedder.encode(query)
-
- # Normalize to List[float]
- if hasattr(vec, "tolist"):
- vec = vec.tolist()
-
- # 处理嵌套列表情况
- if isinstance(vec, list) and vec and isinstance(vec[0], (list, tuple)):
- vec = vec[0] # Extract first vector if nested
-
- # 转换为float列表
- result = [float(x) for x in vec]
-
- # 检查维度
- if len(result) != dimension:
- print(f"[WARNING] Query向量维度异常: 期望{dimension}, 实际{len(result)}")
- # 用零向量填充或截断
- if len(result) < dimension:
- result.extend([0.0] * (dimension - len(result)))
- else:
- result = result[:dimension]
-
- return result
- except Exception as e:
- print(f"[WARNING] Query embedding failed: {e}")
- # Return zero vector as fallback
- return [0.0] * dimension
- def search_vectors(
- store = None,
- query: str = "",
- top_k: int = 8,
- rag_namespace: Optional[str] = None,
- only_rag_data: bool = True,
- score_threshold: Optional[float] = None
- ) -> List[Dict]:
- """
- Search RAG vectors using unified embedding and Qdrant.
- """
- if not query:
- return []
-
- # Create default store if not provided
- if store is None:
- store = _create_default_vector_store()
-
- # Embed query with unified embedder
- qv = embed_query(query)
-
- # Build filter for RAG data
- where = {"memory_type": "rag_chunk"}
- if only_rag_data:
- where["is_rag_data"] = True
- where["data_source"] = "rag_pipeline"
- if rag_namespace:
- where["rag_namespace"] = rag_namespace
-
- try:
- return store.search_similar(
- query_vector=qv,
- limit=top_k,
- score_threshold=score_threshold,
- where=where
- )
- except Exception as e:
- print(f"[WARNING] RAG search failed: {e}")
- return []
- def _prompt_mqe(query: str, n: int) -> List[str]:
- try:
- from core.llm import HelloAgentsLLM
- llm = HelloAgentsLLM()
- prompt = [
- {"role": "system", "content": "你是检索查询扩展助手。生成语义等价或互补的多样化查询。使用中文,简短,避免标点。"},
- {"role": "user", "content": f"原始查询:{query}\n请给出{n}个不同表述的查询,每行一个。"}
- ]
- text = llm.invoke(prompt)
- lines = [ln.strip("- \t") for ln in (text or "").splitlines()]
- outs = [ln for ln in lines if ln]
- return outs[:n] or [query]
- except Exception:
- return [query]
- def _prompt_hyde(query: str) -> Optional[str]:
- try:
- from core.llm import HelloAgentsLLM
- llm = HelloAgentsLLM()
- prompt = [
- {"role": "system", "content": "根据用户问题,先写一段可能的答案性段落,用于向量检索的查询文档(不要分析过程)。"},
- {"role": "user", "content": f"问题:{query}\n请直接写一段中等长度、客观、包含关键术语的段落。"}
- ]
- return llm.invoke(prompt)
- except Exception:
- return None
- def search_vectors_expanded(
- store = None,
- query: str = "",
- top_k: int = 8,
- rag_namespace: Optional[str] = None,
- only_rag_data: bool = True,
- score_threshold: Optional[float] = None,
- enable_mqe: bool = False,
- mqe_expansions: int = 2,
- enable_hyde: bool = False,
- candidate_pool_multiplier: int = 4,
- ) -> List[Dict]:
- """
- Search with query expansion using unified embedding and Qdrant.
- """
- if not query:
- return []
-
- # Create default store if not provided
- if store is None:
- store = _create_default_vector_store()
-
- # expansions
- expansions: List[str] = [query]
-
- if enable_mqe and mqe_expansions > 0:
- expansions.extend(_prompt_mqe(query, mqe_expansions))
- if enable_hyde:
- hyde_text = _prompt_hyde(query)
- if hyde_text:
- expansions.append(hyde_text)
- # unique and trim
- uniq: List[str] = []
- for e in expansions:
- if e and e not in uniq:
- uniq.append(e)
- expansions = uniq[: max(1, len(uniq))]
- # distribute pool per expansion
- pool = max(top_k * candidate_pool_multiplier, 20)
- per = max(1, pool // max(1, len(expansions)))
- # Build filter for RAG data
- where = {"memory_type": "rag_chunk"}
- if only_rag_data:
- where["is_rag_data"] = True
- where["data_source"] = "rag_pipeline"
- if rag_namespace:
- where["rag_namespace"] = rag_namespace
- # collect hits across expansions
- agg: Dict[str, Dict] = {}
- for q in expansions:
- qv = embed_query(q)
- hits = store.search_similar(query_vector=qv, limit=per, score_threshold=score_threshold, where=where)
- for h in hits:
- mid = h.get("metadata", {}).get("memory_id", h.get("id"))
- s = float(h.get("score", 0.0))
- if mid not in agg or s > float(agg[mid].get("score", 0.0)):
- agg[mid] = h
- # return top by score
- merged = list(agg.values())
- merged.sort(key=lambda x: float(x.get("score", 0.0)), reverse=True)
- return merged[:top_k]
- def _try_load_cross_encoder(model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
- try:
- from sentence_transformers import CrossEncoder
- return CrossEncoder(model_name)
- except Exception:
- return None
- def rerank_with_cross_encoder(query: str, items: List[Dict], model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2", top_k: int = 10) -> List[Dict]:
- ce = _try_load_cross_encoder(model_name)
- if ce is None or not items:
- return items[:top_k]
- pairs = [[query, it.get("content", "")] for it in items]
- try:
- scores = ce.predict(pairs)
- for it, s in zip(items, scores):
- it["rerank_score"] = float(s)
- items.sort(key=lambda x: x.get("rerank_score", x.get("score", 0.0)), reverse=True)
- return items[:top_k]
- except Exception:
- return items[:top_k]
- def compute_graph_signals_from_pool(vector_hits: List[Dict], same_doc_weight: float = 1.0, proximity_weight: float = 1.0, proximity_window_chars: int = 1600) -> Dict[str, float]:
- """
- Compute graph signals with direct parameters instead of environment variables.
- """
- # group by doc
- by_doc: Dict[str, List[Dict]] = {}
- for h in vector_hits:
- meta = h.get("metadata", {})
- did = meta.get("doc_id")
- if not did:
- # fall back to memory_id grouping if doc missing
- did = meta.get("memory_id") or h.get("id")
- by_doc.setdefault(did, []).append(h)
- # same-doc density score
- doc_counts = {d: len(arr) for d, arr in by_doc.items()}
- max_count = max(doc_counts.values()) if doc_counts else 1
- # proximity score per hit within same doc
- graph_signal: Dict[str, float] = {}
- for did, arr in by_doc.items():
- arr.sort(key=lambda x: x.get("metadata", {}).get("start", 0))
- # precompute density
- density = doc_counts.get(did, 1) / max_count
- # proximity accumulation
- for i, h in enumerate(arr):
- mid = h.get("metadata", {}).get("memory_id", h.get("id"))
- pos_i = h.get("metadata", {}).get("start", 0)
- prox_acc = 0.0
- # look around neighbors within window
- # two-pointer expansion
- # left
- j = i - 1
- while j >= 0:
- pos_j = arr[j].get("metadata", {}).get("start", 0)
- dist = abs(pos_i - pos_j)
- if dist > proximity_window_chars:
- break
- prox_acc += max(0.0, 1.0 - (dist / max(1.0, float(proximity_window_chars))))
- j -= 1
- # right
- j = i + 1
- while j < len(arr):
- pos_j = arr[j].get("metadata", {}).get("start", 0)
- dist = abs(pos_i - pos_j)
- if dist > proximity_window_chars:
- break
- prox_acc += max(0.0, 1.0 - (dist / max(1.0, float(proximity_window_chars))))
- j += 1
- # combine
- score = same_doc_weight * density + proximity_weight * prox_acc
- graph_signal[mid] = graph_signal.get(mid, 0.0) + score
- # normalize to [0,1]
- if graph_signal:
- max_v = max(graph_signal.values())
- if max_v > 0:
- for k in list(graph_signal.keys()):
- graph_signal[k] = graph_signal[k] / max_v
- return graph_signal
- def rank(vector_hits: List[Dict], graph_signals: Optional[Dict[str, float]] = None, w_vector: float = 0.7, w_graph: float = 0.3) -> List[Dict]:
- """
- Rank results with direct weight parameters instead of environment variables.
- """
- items: List[Dict] = []
- graph_signals = graph_signals or {}
- for h in vector_hits:
- mid = h.get("metadata", {}).get("memory_id", h.get("id"))
- g = float(graph_signals.get(mid, 0.0))
- v = float(h.get("score", 0.0))
- score = w_vector * v + w_graph * g
- items.append({
- "memory_id": mid,
- "score": score,
- "vector_score": v,
- "graph_score": g,
- "content": h.get("metadata", {}).get("content", ""),
- "metadata": h.get("metadata", {}),
- })
- items.sort(key=lambda x: x["score"], reverse=True)
- return items
- def merge_snippets(ranked_items: List[Dict], max_chars: int = 1200) -> str:
- out: List[str] = []
- total = 0
- for it in ranked_items:
- text = it.get("content", "").strip()
- if not text:
- continue
- if total + len(text) > max_chars:
- remain = max_chars - total
- if remain <= 0:
- break
- out.append(text[:remain])
- total += remain
- break
- out.append(text)
- total += len(text)
- return "\n\n".join(out)
- def expand_neighbors_from_pool(selected: List[Dict], pool: List[Dict], neighbors: int = 1, max_additions: int = 5) -> List[Dict]:
- if not selected or not pool or neighbors <= 0:
- return selected
- # index pool by doc_id and sort by start
- by_doc: Dict[str, List[Dict]] = {}
- for it in pool:
- meta = it.get("metadata", {})
- did = meta.get("doc_id")
- if not did:
- continue
- by_doc.setdefault(did, []).append(it)
- for did, arr in by_doc.items():
- arr.sort(key=lambda x: (x.get("metadata", {}).get("start", 0)))
- selected_ids = set(it.get("memory_id") for it in selected)
- additions: List[Dict] = []
- for it in selected:
- meta = it.get("metadata", {})
- did = meta.get("doc_id")
- if not did or did not in by_doc:
- continue
- arr = by_doc[did]
- # find index
- try:
- idx = next(i for i, x in enumerate(arr) if x.get("memory_id") == it.get("memory_id"))
- except StopIteration:
- continue
- for offset in range(1, neighbors + 1):
- for j in (idx - offset, idx + offset):
- if 0 <= j < len(arr):
- cand = arr[j]
- mid = cand.get("memory_id")
- if mid not in selected_ids:
- additions.append(cand)
- selected_ids.add(mid)
- if len(additions) >= max_additions:
- break
- if len(additions) >= max_additions:
- break
- if len(additions) >= max_additions:
- break
- # keep relative order by score
- extended = list(selected) + additions
- extended.sort(key=lambda x: (x.get("rerank_score", x.get("score", 0.0))), reverse=True)
- return extended
- def merge_snippets_grouped(ranked_items: List[Dict], max_chars: int = 1200, include_citations: bool = True) -> str:
- # Group by doc_id and aggregate doc score
- by_doc: Dict[str, List[Dict]] = {}
- doc_score: Dict[str, float] = {}
- for it in ranked_items:
- meta = it.get("metadata", {})
- did = meta.get("doc_id") or meta.get("source_path") or "unknown"
- by_doc.setdefault(did, []).append(it)
- doc_score[did] = doc_score.get(did, 0.0) + float(it.get("score", 0.0))
- # Sort docs by aggregate score
- ordered_docs = sorted(by_doc.keys(), key=lambda d: doc_score.get(d, 0.0), reverse=True)
- # Within doc, order by start offset to preserve context
- for d in ordered_docs:
- by_doc[d].sort(key=lambda x: (x.get("metadata", {}).get("start", 0)))
- out: List[str] = []
- citations: List[Dict] = []
- total = 0
- cite_index = 1
- for did in ordered_docs:
- parts = by_doc[did]
- for it in parts:
- text = (it.get("content", "") or "").strip()
- if not text:
- continue
- # add citation marker if enabled
- suffix = ""
- if include_citations:
- suffix = f" [{cite_index}]"
- need = len(text) + (len(suffix) if suffix else 0)
- if total + need > max_chars:
- remain = max_chars - total
- if remain <= 0:
- break
- clipped = text[: max(0, remain - len(suffix))]
- if clipped:
- out.append(clipped + suffix)
- total += len(clipped) + len(suffix)
- if include_citations:
- m = it.get("metadata", {})
- citations.append({
- "index": cite_index,
- "source_path": m.get("source_path"),
- "doc_id": m.get("doc_id"),
- "start": m.get("start"),
- "end": m.get("end"),
- "heading_path": m.get("heading_path"),
- })
- cite_index += 1
- break
- out.append(text + suffix)
- total += need
- if include_citations:
- m = it.get("metadata", {})
- citations.append({
- "index": cite_index,
- "source_path": m.get("source_path"),
- "doc_id": m.get("doc_id"),
- "start": m.get("start"),
- "end": m.get("end"),
- "heading_path": m.get("heading_path"),
- })
- cite_index += 1
- if total >= max_chars:
- break
- merged = "\n\n".join(out)
- if include_citations and citations:
- lines: List[str] = [merged, "", "References:"]
- for c in citations:
- loc = ""
- if c.get("start") is not None and c.get("end") is not None:
- loc = f" ({c['start']}-{c['end']})"
- hp = f" – {c['heading_path']}" if c.get("heading_path") else ""
- sp = c.get("source_path") or c.get("doc_id") or "source"
- lines.append(f"[{c['index']}] {sp}{loc}{hp}")
- return "\n".join(lines)
- return merged
- def compress_ranked_items(ranked_items: List[Dict], enable_compression: bool = True, max_per_doc: int = 2, join_gap: int = 200) -> List[Dict]:
- """
- Compress ranked items with direct parameters instead of environment variables.
- """
- if not enable_compression:
- return ranked_items
- by_doc_count: Dict[str, int] = {}
- last_by_doc: Dict[str, Dict] = {}
- new_items: List[Dict] = []
- for it in ranked_items:
- meta = it.get("metadata", {})
- did = meta.get("doc_id") or meta.get("source_path") or "unknown"
- start = int(meta.get("start") or 0)
- end = int(meta.get("end") or (start + len(it.get("content", "") or "")))
- if did not in last_by_doc:
- last_by_doc[did] = it
- by_doc_count[did] = 1
- new_items.append(it)
- continue
- last = last_by_doc[did]
- lmeta = last.get("metadata", {})
- lstart = int(lmeta.get("start") or 0)
- lend = int(lmeta.get("end") or (lstart + len(last.get("content", "") or "")))
- if start - lend <= join_gap and start >= lstart:
- # merge into last
- merged_text = (last.get("content", "") or "").strip()
- add_text = (it.get("content", "") or "").strip()
- if add_text:
- if merged_text:
- merged_text = merged_text + "\n\n" + add_text
- else:
- merged_text = add_text
- last["content"] = merged_text
- lmeta["end"] = max(lend, end)
- # keep the higher score
- try:
- last["score"] = max(float(last.get("score", 0.0)), float(it.get("score", 0.0)))
- except Exception:
- pass
- last_by_doc[did] = last
- else:
- cnt = by_doc_count.get(did, 0)
- if cnt >= max_per_doc:
- continue
- new_items.append(it)
- last_by_doc[did] = it
- by_doc_count[did] = cnt + 1
- return new_items
- def tldr_summarize(text: str, bullets: int = 3) -> Optional[str]:
- try:
- if not text or len(text.strip()) == 0:
- return None
- from core.llm import HelloAgentsLLM
- llm = HelloAgentsLLM()
- prompt = [
- {"role": "system", "content": "请将以下内容概括为简洁的要点列表(最多3-5条),用中文,避免重复,突出关键信息。"},
- {"role": "user", "content": f"请用 {max(1, min(5, int(bullets)))} 条要点总结:\n\n{text}"},
- ]
- out = llm.invoke(prompt)
- return out
- except Exception:
- return None
- # ==================
- # High-level RAG Pipeline API
- # ==================
- def create_rag_pipeline(
- qdrant_url: Optional[str] = None,
- qdrant_api_key: Optional[str] = None,
- collection_name: str = "hello_agents_rag_vectors",
- rag_namespace: str = "default"
- ) -> Dict[str, Any]:
- """
- Create a complete RAG pipeline with Qdrant and unified embedding.
-
- Returns:
- Dict containing store, namespace, and helper functions
- """
- dimension = get_dimension(384)
-
- store = QdrantVectorStore(
- url=qdrant_url,
- api_key=qdrant_api_key,
- collection_name=collection_name,
- vector_size=dimension,
- distance="cosine"
- )
-
- def add_documents(file_paths: List[str], chunk_size: int = 800, chunk_overlap: int = 100):
- """Add documents to RAG pipeline"""
- chunks = load_and_chunk_texts(
- paths=file_paths,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- namespace=rag_namespace,
- source_label="rag"
- )
- index_chunks(
- store=store,
- chunks=chunks,
- rag_namespace=rag_namespace
- )
- return len(chunks)
-
- def search(query: str, top_k: int = 8, score_threshold: Optional[float] = None):
- """Search RAG knowledge base"""
- return search_vectors(
- store=store,
- query=query,
- top_k=top_k,
- rag_namespace=rag_namespace,
- score_threshold=score_threshold
- )
-
- def search_advanced(
- query: str,
- top_k: int = 8,
- enable_mqe: bool = False,
- enable_hyde: bool = False,
- score_threshold: Optional[float] = None
- ):
- """Advanced search with query expansion"""
- return search_vectors_expanded(
- store=store,
- query=query,
- top_k=top_k,
- rag_namespace=rag_namespace,
- enable_mqe=enable_mqe,
- enable_hyde=enable_hyde,
- score_threshold=score_threshold
- )
-
- def get_stats():
- """Get pipeline statistics"""
- return store.get_collection_stats()
-
- return {
- "store": store,
- "namespace": rag_namespace,
- "add_documents": add_documents,
- "search": search,
- "search_advanced": search_advanced,
- "get_stats": get_stats
- }
|