一个基于 LangChain、LangGraph、Milvus 的智能知识库系统,集成 RAG、Memory、MCP、Output Parser、Prompt Template 等先进技术。
RAG (检索增强生成)
Memory (记忆管理)
LangGraph (工作流编排)
MCP (Model Context Protocol)
Output Parser (输出解析)
Prompt Template (提示词模板)
| 技术 | 版本 | 用途 |
|---|---|---|
| LangChain | 0.3.7 | 核心框架 |
| LangGraph | 0.2.45 | 工作流编排 |
| LangChain-OpenAI | 0.2.5 | OpenAI 集成 |
| PyMilvus | 2.4.9 | 向量数据库 |
| sentence-transformers | 3.3.1 | 文本嵌入 |
| Pydantic | 2.9.2 | 数据验证 |
| pytest | 8.3.3 | 单元测试 |
┌─────────────────────────────────────────────────────┐ │ 用户交互层 │ │ (CLI / API / Web Interface) │ └────────────────────┬────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────┐ │ MCP 服务器层 │ │ (Model Context Protocol) │ └────────────────────┬────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────┐ │ LangGraph 工作流层 │ │ (分析 → 检索 → 生成 → 评估 → 记忆) │ └─┬───────────────┬───────────────┬───────────────────┘ │ │ │ ┌─▼───────┐ ┌───▼──────┐ ┌──▼──────────────┐ │ RAG引擎 │ │记忆管理 │ │ 输出解析器 │ │ │ │ │ │ │ └─┬───────┘ └──┬──────┘ └┬────────────────┘ │ │ │ ┌─▼───────┐ ┌──▼──────┐ ┌▼──────────────┐ │ Milvus │ │ LangChain│ │Prompt Templates│ │向量数据库│ │ LLM │ │ │ └─────────┘ └─────────┘ └────────────────┘
cd /workspace
pip install -r requirements.txt
cp .env.example .env
# 编辑 .env 文件,填入你的配置
使用 Docker 启动 Milvus:
docker run -d \ --name milvus-standalone \ -p 19530:19530 \ -p 9091:9091 \ -v /tmp/milvus:/var/lib/milvus \ milvusdb/milvus:latest
python -m pytest src/tests/ -v
from src.components.rag_engine import RAGEngine
from src.utils.document_loader import DocumentLoader
# 初始化
document_loader = DocumentLoader()
rag_engine = RAGEngine()
# 加载文档
documents = document_loader.load_directory("./data")
rag_engine.add_documents(documents)
# 查询
result = rag_engine.query("Python 有什么特点?")
print(result.answer)
from src.graph.workflow import KnowledgeAssistantGraph
# 创建工作流
graph = KnowledgeAssistantGraph()
# 运行
result = graph.run("什么是机器学习?")
print(result["answer"])
from src.components.mcp_server import MCPServer
from src.models import MCPRequest
# 初始化服务器
mcp_server = MCPServer()
# 发送请求
request = MCPRequest(
method="query",
params={"question": "LangChain 是什么?"},
request_id="001"
)
response = mcp_server.handle_request(request)
print(response.result)
from src.utils.document_loader import DocumentLoader
# 加载单个文件
doc = document_loader.load_file("document.txt")
# 加载整个目录
docs = document_loader.load_directory("./docs", recursive=True)
# 添加到知识库
rag_engine.add_documents(docs)
# 简单查询
result = rag_engine.query("你的问题")
# 带参数查询
result = rag_engine.query(
question="你的问题",
top_k=10
)
# 仅搜索不生成
docs = rag_engine.search("搜索词", top_k=5)
from src.components.memory_manager import MemoryManager
# 创建记忆管理器
memory = MemoryManager(session_id="user_123")
# 保存对话
memory.save_context(
inputs={"input": "你好"},
outputs={"output": "你好!有什么可以帮助你的?"}
)
# 获取历史
history = memory.get_chat_history()
# 导出历史
exported = memory.export_history()
from src.utils.templates import PromptTemplateManager
# 获取默认模板
rag_template = template_manager.get_rag_template()
qa_template = template_manager.get_qa_template()
# 创建自定义模板
custom = template_manager.create_custom_template(
template_name="my_template",
template_string="{input} -> {output}",
input_variables=["input", "output"]
)
# 使用模板
formatted = custom.format(input="问题", output="答案")
from src.utils.parsers import OutputParser
# 创建解析器
parser = OutputParser()
# 解析输出
result = parser.parse(json_output)
print(result.answer)
print(result.confidence)
from src.graph.workflow import KnowledgeAssistantGraph
# 创建工作流
graph = KnowledgeAssistantGraph()
# 查看工作流图
print(graph.get_graph_info())
# 运行工作流
result = graph.run("用户问题")
# 流式运行
for step in graph.stream("用户问题"):
print(step)
__init__(embedding_util, collection_name) - 初始化add_documents(documents) - 添加文档search(query, top_k) - 搜索文档query(question, top_k) - 查询并生成答案delete_document(doc_id) - 删除文档get_collection_stats() - 获取统计信息__init__(session_id) - 初始化save_context(inputs, outputs) - 保存上下文load_memory_variables() - 加载记忆变量get_chat_history() - 获取对话历史clear() - 清空记忆__init__(rag_engine) - 初始化register_handler(method, handler) - 注册处理器handle_request(request) - 处理请求list_methods() - 列出方法get_method_schema(method) - 获取方法schemapytest src/tests/ -v
pytest src/tests/test_embeddings.py -v pytest src/tests/test_parsers.py -v pytest src/tests/test_memory_manager.py -v
pytest src/tests/ --cov=src --cov-report=html
smart-knowledge-assistant/ ├── config/ # 配置文件 │ └── __init__.py # 设置管理 ├── src/ │ ├── components/ # 核心组件 │ │ ├── rag_engine.py # RAG引擎 │ │ ├── memory_manager.py # 记忆管理 │ │ └── mcp_server.py # MCP服务器 │ ├── graph/ # LangGraph工作流 │ │ └── workflow.py # 工作流定义 │ ├── utils/ # 工具类 │ │ ├── embeddings.py # 嵌入工具 │ │ ├── parsers.py # 输出解析 │ │ ├── templates.py # 提示词模板 │ │ └── document_loader.py # 文档加载 │ ├── models/ # 数据模型 │ │ └── __init__.py │ └── tests/ # 单元测试 │ ├── test_embeddings.py │ ├── test_parsers.py │ ├── test_memory_manager.py │ └── ... ├── examples/ # 使用示例 │ ├── basic_usage.py │ ├── workflow_example.py │ └── mcp_example.py ├── docs/ # 文档 │ └── README.md ├── main.py # 主程序入口 ├── requirements.txt # 依赖列表 └── .env.example # 环境变量示例
A: 请确保 Milvus 服务正在运行:
# 检查 Milvus 容器
docker ps | grep milvus
# 查看日志
docker logs milvus-standalone
A: 修改 .env 文件中的 EMBEDDING_MODEL 配置,或在代码中指定:
from src.utils.embeddings import EmbeddingUtil
embedder = EmbeddingUtil(model_name="your-model-name")
A: 使用 register_handler 方法:
def my_handler(params):
return {"result": "custom response"}
mcp_server.register_handler("my_method", my_handler)
A: 可以从以下方面优化:
top_k 参数nlist 参数A: 使用 stream 方法查看每一步的输出:
for step in graph.stream("问题"):
for node, output in step.items():
print(f"[{node}]: {output}")
欢迎提交 Issue 和 Pull Request!
MIT License