结构化抽取与问答平台
把结构化信息抽取、向量检索与有依据问答放进同一套文档工作流里,适用于多个垂直场景。
实际项目名叫 LangExtractApp。区分点不是「Chat with PDF」式的检索,而是抽取层 + 检索层 + 有依据问答——同一份文档、两种消费形态。底层用 Google 开源的 langextract 库做信息抽取,最大特性是每个抽取都带 char_interval 指回原文位置(source grounding)。
7 个生产场景 ship 在 backend/app/scenarios/
scenarios/
├── base.py # BaseScenario 抽象基类 + ScenarioRegistry
├── radiology.py # 放射学报告
├── medication.py # 药物信息
├── news.py # 新闻信息
├── finance.py # 金融分析
├── medical.py # 中医药机制研究
├── customer_service.py # 客服工单
└── sales.py # 销售商机
每个场景定义 4 件事:extract_classes(实体类别)、get_prompt()(LLM prompt)、get_examples()(few-shot ExampleData)、get_samples()(演示样本)。
BaseScenario + ScenarioRegistry
# backend/app/scenarios/base.py
class BaseScenario(ABC):
name: str = "基础场景"
description: str = "场景描述"
extract_classes: List[str] = []
@abstractmethod
def get_prompt(self) -> str: ...
@abstractmethod
def get_examples(self) -> List[lx.data.ExampleData]: ...
def get_samples(self) -> List[Dict[str, str]]:
return []
class ScenarioRegistry:
_scenarios: Dict[str, Type[BaseScenario]] = {}
@classmethod
def register(cls, scenario_id, scenario_class):
cls._scenarios[scenario_id] = scenario_class
@classmethod
def get(cls, scenario_id) -> BaseScenario:
if scenario_id not in cls._scenarios:
raise ValueError(f"Unknown scenario: {scenario_id}")
return cls._scenarios[scenario_id]()
@classmethod
def list_all(cls) -> Dict[str, Dict[str, Any]]:
return {sid: cls._scenarios[sid]().get_info()
for sid in cls._scenarios}
加新垂直 = 写 1 个 BaseScenario 子类 + 模块顶层调 ScenarioRegistry.register(...) 一次即可。/scenarios 端点直接返回 list_all()。
示范:放射学场景
# backend/app/scenarios/radiology.py
class RadiologyScenario(BaseScenario):
name = "放射学报告"
description = "从医学影像报告中提取结构化信息..."
extract_classes = ["检查类型", "临床指征", "检查技术", "发现", "印象", "建议"]
def get_examples(self) -> List[lx.data.ExampleData]:
return [
lx.data.ExampleData(
text="CT检查报告\n临床指征: 腹痛待查\n...",
extractions=[
lx.data.Extraction(
extraction_class="检查类型",
extraction_text="CT检查报告",
attributes={"类型": "CT"}
),
lx.data.Extraction(
extraction_class="发现",
extraction_text="胆囊壁轻度增厚,约4mm",
attributes={"部位": "胆囊", "significance": "minor"}
),
# ...
]
)
]
注意 attributes 的用法——它可以为同一个抽取附加任意元数据。药物信息场景用 medication_group 属性把「药物 ↔ 剂量 ↔ 频率 ↔ 用法 ↔ 疗程」连成一条记录,简单又有效。
核心提取器(backend/app/core/extractor.py)
class Extractor:
def __init__(self, settings: Settings):
self.settings = settings
self.cache = CacheManager(
cache_dir=settings.cache_dir,
enabled=settings.cache_enabled
)
def extract(self, text: str, scenario_id: str, use_cache: bool = True):
# 1. 文本预处理(sanitize.py 去脏字符 / 统一换行)
sanitized = sanitize_text(text)
# 2. 缓存命中?
if use_cache:
cached = self.cache.get(sanitized, scenario_id)
if cached:
cached["from_cache"] = True
return cached
# 3. 取场景(ScenarioRegistry 注册的)
scenario = ScenarioRegistry.get(scenario_id)
# 4. 真跑 LangExtract
result = lx.extract(
text_or_documents=sanitized,
prompt_description=scenario.get_prompt(),
examples=scenario.get_examples(),
model=self._get_model(),
fence_output=True,
use_schema_constraints=False,
prompt_validation_level=PromptValidationLevel.OFF,
show_progress=False,
)
# 5. 构建响应(含 char_interval + segments by class)
response = self._build_response(result, scenario_id, sanitized)
if use_cache:
self.cache.set(sanitized, scenario_id, response)
return response
_build_response 把每个 extraction 转换为 {extraction_class, extraction_text, attributes, char_interval: {start_pos, end_pos}},并按类聚合成 segments[](同类抽取可以有多个 intervals)。
双向量后端真的两个都实现了
backend/app/services/ 有 vector_store.py(Qdrant)和 vector_store_chroma.py(Chroma 持久化),通过 VECTOR_STORE_BACKEND 环境变量切换:
# backend/app/api/rag_routes.py:73-94
backend = settings.vector_store_backend.lower()
if backend == "chroma":
from app.services.vector_store_chroma import ChromaVectorStore
_vector_store = ChromaVectorStore(
collection_name="langextract_docs",
persist_directory=settings.chroma_persist_dir,
embedding_model=settings.embedding_model,
embedding_api_key=settings.dashscope_api_key,
embedding_base_url=settings.dashscope_base_url,
)
else:
from app.services.vector_store import VectorStore
_vector_store = VectorStore(
collection_name="langextract_docs",
qdrant_url=settings.qdrant_url,
qdrant_api_key=settings.qdrant_api_key,
embedding_model=settings.embedding_model,
embedding_api_key=settings.dashscope_api_key,
embedding_base_url=settings.dashscope_base_url,
)
_vector_store.init_collection(recreate=False)
两个类的 DocumentChunk 数据结构 + add_chunks() / search() / delete_by_doc_id() 方法签名完全一致——这是能在 rag_routes 里无缝切换的前提。
| 维度 | Qdrant (VectorStore) | Chroma (ChromaVectorStore) |
|---|---|---|
| 部署模式 | remote / 内存模式 fallback | 仅本地持久化 (chroma_db/) |
| 距离 | models.Distance.COSINE | metadata["hnsw:space"] = "cosine" |
| 过滤 | models.Filter / FieldCondition | where={"doc_id": {"$eq": ...}} |
| 配置 env | QDRANT_URL + QDRANT_API_KEY | CHROMA_PERSIST_DIR |
| Embedding | 共用 OpenAIEmbeddings(DashScope text-embedding-v4, chunk_size=10) | 同左 |
12 个 RAG 端点(backend/app/api/rag_routes.py)
POST /rag/pdf/parse # 从 URL 解析 PDF(MinerU 模式 vlm/pipeline)
POST /rag/pdf/upload # 上传本地 PDF (≤200MB, ≤600p)
GET /rag/pdf/task/{task_id} # 查解析任务状态
POST /rag/search # 语义搜索
POST /rag/qa # 智能问答
POST /rag/qa/stream # 流式问答
POST /rag/chat # 多轮对话
POST /rag/documents # 加文档到知识库
POST /rag/extractions # 加抽取结果到知识库
DELETE /rag/documents/{doc_id}
GET /rag/stats # 知识库统计
POST /rag/init # 初始化 / 重建
PDF 上传后的生产流水线(rag_routes.py:197-319):
1. validate .pdf + size ≤ 200MB
2. PDFParser.parse_uploaded_file(content, filename,
model_version="vlm"|"pipeline",
timeout=600) → MinerU 返回 markdown
3. markdown.split("\n\n") → DocumentChunk[paragraph_index, source="pdf_upload"]
4. vector_store.add_chunks(chunks) → 按 VECTOR_STORE_BACKEND 路由
5. optional: extract_after_parse=True + scenario → 跑 Extractor → 返回 extractions[]
6. response: PDFParseResponse(success, task_id, markdown, source,
parse_time, extractions[])
QAAgent(backend/app/services/qa_agent.py)
class QAAgent:
def search_context(self, query, top_k=5):
return self.vector_store.search(query, top_k)
def format_context(self, results):
# 拼成 "[来源 1] ..." 多段文本喂给 prompt
...
def build_prompt(self, question, context, structured=True):
system_prompt = """你是一个专业的知识问答助手。请用简洁的结构化格式回答问题。
输出格式要求:
1. 先用一句话总结答案
2. 然后分点列出主要内容,每点用"• "开头
3. 每个要点包含:标题、关键词(用【】标注)、详细说明
4. 最后可以加一个简短结论
重要:不要在回答中提及"来源"、"文档"、"参考"等词,直接陈述知识内容。"""
# ...
def answer(self, question, top_k=5, return_sources=True):
# 一次性 LLM invoke + 收回 sources[]
...
def answer_stream(self, question, top_k=5):
# 流式 yield chunks
for chunk in self.llm.stream(...):
if chunk.content:
yield chunk.content
def chat(self, ...):
# 多轮对话,注入历史 messages
...
注意 system prompt 让模型自己不要提及「来源」,但响应里仍然返回 sources[{doc_id, doc_title, content_preview, score}]——前端可以独立渲染「来源溯源」面板。这是体验上的设计:模型说人话,UI 显示证据。
价值点
- Source grounding 是审计性需求——医疗 / 法律 / 合规场景必须能追溯每条断言的原文位置
- 可插拔向量后端不是花架子:Qdrant remote 适合云端多机共享 / Chroma 本地持久化适合内网部署
- 抽取 + 检索 + QA 统一在一个 BaseScenario 下——加新垂直只需写 1 个 Python 类
Demo 真实材料对应
互动 Demo 里 3 个场景(放射学报告 / 药物信息 / 新闻信息)的文本和 extract_classes 完全从 backend/app/scenarios/{radiology,medication,news}.py 的 get_samples() 取出。char_interval 是模块加载时根据源文本真实计算的偏移量,不是装饰。点上面「打开 Demo」可看高亮 hover 显示真实 offset。