logo
0
0
WeChat Login
feat: 添加MCP实时行情数据模块

A股智能操盘系统 v3.0 - BigQuant架构

基于BigQuant专业量化平台架构的智能交易系统

🚀 v3.0重大更新

核心改进

  1. 全新架构: 完全参考BigQuant专业量化平台设计
  2. 流水线式数据处理: 股票池→因子→过滤→评分→仓位→执行→评估
  3. 三级风控体系: 市场级+组合级+个股级
  4. 统一数据对齐: 所有数据以(date, instrument)为主键
  5. 策略基类框架: 支持快速开发自定义策略
  6. MCP实时行情: 通过MCP协议获取通达信截图+豆包OCR识别
  7. 本地数据库: SQLite存储历史行情和识别结果

与v2.0对比

维度v2.0v3.0
架构Python Agent体系BigQuant流水线架构
数据处理分散在Agent统一DataEngine
风控体系简单止盈止损三级风控(市场/组合/个股)
策略开发Agent协调BaseStrategy基类
数据源Akshare/TushareMCP+通达信+豆包+数据库
代码量~5000行~4000行核心代码
可维护性

📋 系统架构

用户界面层 ├─ Grafana Dashboard ├─ CodeBuddy CLI └─ API接口(可选) ↓ 核心引擎层 ├─ DataEngine (数据引擎) ├─ StrategyEngine (策略引擎) ├─ RiskEngine (风控引擎) ├─ BacktestEngine (回测引擎) └─ ExecutionEngine (执行引擎) ↓ 业务逻辑层 ├─ Strategies (策略库) ├─ Factors (因子库) ├─ RiskControl (风控库) ├─ Execution (执行管理) └─ Monitoring (监控告警) ↓ 数据访问层 ├─ StockDB (行情数据) ├─ DataStore (存储引擎) └─ Cache (缓存) ↓ 外部数据源 ├─ MCP+通达信+豆包 (实时行情) ├─ Akshare ├─ Tushare └─ Local CSV

🏗️ 核心模块

1. DataEngine (数据引擎)

类似BigQuant DAI,提供统一的数据处理流水线:

from core.data_engine import DataEngine engine = DataEngine() # 1. 构建股票池 universe = engine.build_universe( exchanges=["SH", "SZ"], sectors=["主板", "创业板"], min_market_cap=1000000000 # >10亿 ) # 2. 计算因子 features = engine.compute_features( universe, expr_list=["close / lag(close, 20) - 1 AS return_20d"] ) # 3. 应用过滤 filtered = engine.apply_filters( features, filter_conditions=["suspended = 0", "st_status = 0"] ) # 4. 评分排序 ranked = engine.rank_scores( filtered, score_field="return_20d", order="DESC" ) # 5. 生成仓位 positions = engine.generate_positions( ranked, hold_count=10, total_position=1.0 )

2. RiskEngine (风控引擎)

三级风控体系:

from core.risk_engine import RiskEngine engine = RiskEngine() # 风控检查 actions, target_position = engine.check( market_data, # 市场数据 portfolio, # 投资组合 current_date # 当前日期 ) # 市场级风控 # Level 0 (正常): 100%仓位 # Level 1 (谨慎): 60%仓位 # Level 2 (高风险): 30%仓位 # Level 3 (极端): 0%仓位(清仓) # 组合级风控 # - 单票仓位上限: 10% # - 总仓位上限: 95% # - 行业集中度: < 50% # 个股级风控 # - 止盈: +30%(移动止盈+8%回撤) # - 止损: -15%(龙头股放宽至-20%) # - 持仓时间: > 90天检查

3. MCP实时行情 (新增)

通过MCP协议获取通达信实时行情截图,并使用豆包模型识别:

from core.data_engine import DataEngine # 启用MCP数据源 engine = DataEngine( use_mcp=True, mcp_server_url="http://192.168.1.200:8080/mcp", db_path="data/market_data.db" ) # 获取单个股票实时行情 df = engine.get_realtime_market_data("600519.SH") print(df[['instrument', 'open', 'high', 'low', 'close', 'change_pct']]) # 批量获取 batch_df = engine.batch_get_realtime_market_data([ "600519.SH", "000858.SZ", "300750.SZ" ]) # 查询历史数据 history_df = engine.get_historical_market_data( "600519.SH", start_date="2024-01-01", end_date="2024-12-31" ) # 获取最新数据 latest_df = engine.get_latest_market_data(instrument="600519.SH")

MCP数据流:

Linux侧 (本项目) ↓ HTTP请求 Windows侧 (MCP服务器) ↓ 截图 通达信软件 ↓ Base64图片 豆包视觉模型 ↓ JSON数据 SQLite数据库

详见: /workspace/docs/MCP服务器设计文档.md

4. StrategyEngine (策略引擎)

策略开发框架:

from core.strategy_engine import BaseStrategy class MyStrategy(BaseStrategy): def __init__(self): super().__init__( name="my_strategy", universe_config={...}, feature_list=[...], filter_list=[...], hold_count=10, total_position=1.0, rebalance_period="5d" ) def generate_positions(self, date, universe, features): # 选择收益率最高的10只股票 top10 = features.nlargest(10, 'return_20d') weight = 1.0 / 10 return {ins: weight for ins in top10['instrument']}

🎯 策略示例

动量策略

from strategies.momentum_strategy import MomentumStrategy strategy = MomentumStrategy( hold_count=10, # 持仓10只 lookback_days=20, # 20日动量 rebalance_days=5 # 5交易日调仓 )

进攻型策略

from strategies.momentum_strategy import AggressiveMomentumStrategy strategy = AggressiveMomentumStrategy() # 专注科创板、创业板 # 单票上限15% # 动态调整持仓数量

📊 三级风控体系

市场级风控

风险等级触发条件仓位动作
正常默认状态100%仓位
谨慎5日跌幅 > -7%降至60%
高风险5日跌幅 > -10%降至30%
极端跌破年线+高波动清仓

组合级风控

  • 单票仓位上限: 10%(进攻型15%)
  • 总仓位上限: 95%(保留5%现金)
  • 行业集中度: < 50%

个股级风控

  • 止盈: +30%(移动止盈,最高点回撤8%)
  • 止损: -15%(龙头股放宽至-20%)
  • 持仓时间: > 90天检查
  • 连续跌停: 连续3个跌停板清仓

🚀 快速开始

安装依赖

cd /workspace pip install -r requirements.txt

运行示例

# 动量策略示例 python examples/example_momentum.py

开发新策略

  1. 继承BaseStrategy基类
  2. 实现generate_positions方法
  3. 注册到StrategyEngine
  4. 运行回测

📁 目录结构

/workspace/ ├── core/ # 核心引擎 │ ├── data_engine.py # 数据引擎 │ ├── strategy_engine.py # 策略引擎 │ ├── risk_engine.py # 风控引擎 │ ├── backtest_engine.py # 回测引擎(待实现) │ └── execution_engine.py # 执行引擎(待实现) ├── strategies/ # 策略库 │ └── momentum_strategy.py ├── factors/ # 因子库(待实现) ├── risk_control/ # 风控模块(待实现) ├── execution/ # 执行模块(待实现) ├── monitoring/ # 监控模块(待实现) ├── examples/ # 示例 │ └── example_momentum.py ├── tests/ # 测试(待实现) └── docs/ # 文档 └── BigQuant架构重构设计文档.md

📈 实施进度

Phase 1: 核心引擎 (Week 1-2) - 50%

  • DataEngine实现
  • RiskEngine实现
  • StrategyEngine实现
  • BacktestEngine (待实现)
  • ExecutionEngine (待实现)

Phase 2: 策略框架 (Week 3-4) - 30%

  • BaseStrategy基类
  • MomentumStrategy
  • AggressiveMomentumStrategy
  • ValueStrategy (待实现)
  • MultiFactorStrategy (待实现)
  • 因子库 (待实现)

Phase 3: 监控与集成 (Week 5-6) - 0%

  • Monitoring模块
  • Grafana配置
  • 集成测试

Phase 4: 优化与部署 (Week 7-8) - 0%

  • 性能优化
  • 文档完善
  • 上线部署

🎯 成功标准

功能完整性

  • 核心引擎100%实现 (3/5)
  • 三级风控100%实现
  • 策略框架100%实现 (3/5)
  • BacktestEngine (待实现)
  • Grafana监控 (待实现)

性能指标

  • 数据查询 < 100ms
  • 因子计算 < 1s
  • 回测速度 > 100x
  • 内存占用 < 1GB

测试覆盖率

  • 单元测试 > 80%
  • 集成测试 > 70%

📚 参考资料

  • 设计文档: /workspace/docs/BigQuant架构重构设计文档.md
  • BigQuant学习资料: /workspace/docs/bigquant/
  • AImate项目: /workspace/AImate/

🤝 贡献指南

  1. Fork项目
  2. 创建功能分支
  3. 提交变更
  4. 推送到分支
  5. 创建Pull Request

⚠️ 免责声明

  1. 本系统仅提供决策建议,不自动执行交易
  2. 所有交易操作需由用户手动确认
  3. 市场数据以券商为准,系统数据仅供参考
  4. 投资有风险,入市需谨慎
  5. 本系统不构成投资建议,用户需自行承担投资风险

版本: v3.0 分支: refactor-bigquant-v3 归档分支: archive-v2 创建时间: 2026-01-21 维护者: AI Assistant