摘要:在前两篇文章中,我们通过在 QMT 内部植入策略作为“内鬼服务端”,使用原生 Socket 成功突破了券商版 QMT 的外部调用限制。但上一次实现的“一问一答”短连接模式无法满足实时行情的低延迟需求。本文将跨越"短连接"的局限,避开"长连接"改造中的重重陷阱,彻底打通 QMT 实时行情的主动推送通道!
编写、调试程序是很个性的工作,有群友抬举我,付费要我的代码,我不敢给啊,因为我不是专业程序员,靠着AI辅助编写的半瓶子醋的程序,自己使用没问题,因为我知道哪个按键可以点击,每个数字代表什么意思,出错了我也能立刻Debug。但如果收了钱给别人用,我根本没有时间精力去指导别人如何调试。我现在带着小白学习AI编程也不负责调试找错的啊。
所以呢,我只能把我的思路、基础的源码、我掉进去的坑通过文章发出来,感兴趣的读者多多点赞,让我知道你的认可,我才有心情回答你的提问。
今天继续AI编程,AI能做很多事情,也不能做很多事。几句话说不清楚,自己尝一尝才知道梨子的滋味。骂归骂,还是离不开她啊


在之前的两篇文章中,已经详细讲解了如何在银河QMT 限制 xtquant (MiniQMT)外部连接的情况下,通过“原生 Socket + QMT 内置 Python 策略”的架构,实现了外部程序对 QMT 的资产查询与安全下单。
提醒:以下内容偏技术, 不编程的读者,可直接关闭本文,编程的读者可以把文章直接喂给AI,让它总结可能你能用得着的。
如果你已经跟着跑通了前面的代码,相信你对这套架构已经不陌生了。但之前的架构存在一个关键瓶颈:它是一个基于“短连接”的系统。
短连接的特点是“一问一答,用完即关”。客户端连接端口 -> 发送 QUERY -> 接收结果 -> 关闭连接。
如果用来查持仓、下买单,这种模式非常稳定。但如果我们要获取实时 Tick 行情呢?难道要让外部程序写一个死循环,每隔 0.5 秒就重新建立一次 Socket 连接去问 QMT 吗? 这不仅极其消耗系统资源,延迟极高,更会导致 QMT 服务端因为频繁的连接/断开而直接崩溃。
我们需要的是一条长连接(Long Connection):客户端连上之后就不走了,QMT 一旦有新的行情跳动,就顺着这条一直敞开的通道,主动把数据推送(Push)出来。
从短连接升级到长连接,绝不仅仅是把客户端的 client.close() 删掉那么简单。这牵扯到网络编程的几个核心难点,也是大家最容易踩坑的地方:
很多人改造长连接时,只改了外部的 Python 客户端(让它死循环 recv 痴痴等待)。结果一跑就疯狂报错重连。原因在于:QMT 里面的策略(服务端)还是老逻辑,它发完一句 OK 就执行了 conn.close()。这就好比你赖在店里不走,老板却直接把你扫地出门。解决办法:服务端的架构必须大换血。主线程只负责接客(accept),接到客后,必须马上开启一个独立的子线程来专门服务这个客户,并且在一个 while True 循环里持续保持通信。
在短连接里,一次连接只发一次数据,收到就结束,不用担心数据串掉。 但在长连接里,TICK 行情就像水流一样源源不断地冲过来。如果不加处理,客户端可能会一次性收到 TICK,茅台...TICK,华宝油气... 粘在一起的半截字符串,直接导致解析崩溃。解决办法:引入协议边界。我们在发送的每条消息末尾强制加上换行符 \n。客户端接收时,设立一个 Buffer 缓冲区,严格按照 \n 来切割和提取完整消息。
一条即使没有报错的 TCP 连接,如果长时间没有数据传输,也极有可能被操作系统的底层网络机制或防火墙默默掐断(俗称“死连接”)。休市期间没有行情推送,最容易发生这种情况。解决办法:双向心跳(Heartbeat)。客户端每隔 20 秒主动发一句 PING\n,QMT 服务端收到后立刻回一句 PONG\n。这不仅能防断线,还能让双方随时确认对方的存活状态。
克服了以上陷阱后,我们的 QMT “内鬼”策略演进成了专业的长连接服务器。最核心在于,利用了一个全局列表来维护客户群,并借助 QMT 自带的 handlebar 实现了广播:
# 维护所有保持着长连接的客户端
g_active_clients = []
def client_handler(conn):
# 专属客服线程,永不挂断,处理 PING 和订阅指令
g_active_clients.append(conn)
while True:
data = conn.recv(1024)
if not data: break
# ... 解析与心跳回应 ...
g_active_clients.remove(conn)
def handlebar(ContextInfo):
# QMT 引擎的行情回调,每当有最新 Tick 时被触发
tick_data = ContextInfo.get_market_data(...)
msg = f"TICK,{tick_data.m_strInstrumentID},{tick_data.m_dLastPrice}\n"
# 核心:将行情广播给所有在线的客户端!
for client in g_active_clients:
client.sendall(msg.encode('utf-8'))
当这套架构跑起来,看着控制台里像瀑布一样丝滑滚动的 [实时行情] TICK...,那种突破系统限制的成就感,绝对是量化工程师最享受的时刻。

由于已经过了下午三点,并没有收到跳动的实时行情,而是长连接传来的收盘价。明天再试一试、
插一句:今天调试了一下通达信 API,也不错:


补一个代码片段:
import threading
g_context = None
g_api_lock = threading.Lock() # 保护 QMT 底层 API 的神圣并发锁
def process_command_sync(conn, cmd_str):
"""同步处理指令:直接在当前线程响应,告别排队和休眠超时"""
# ... 解析指令 ...
if action == 'QUERY_TICK':
with g_api_lock: # 拿钥匙进金库
ticks = g_context.get_full_tick([code])
# ... 原地组装结果并发送 ...
conn.sendall(response.encode('utf-8'))
elif action == 'BUY':
with g_api_lock: # 拿钥匙进金库
passorder(23, 1101, account, code, 11, price, vol, 'API', 1, msg, g_context)
conn.sendall(b'OK\n') # 绝不阻塞,发完即回
依靠原生 Socket,我们在银河QMT 环境里硬生生蹚出了一条路。但必须承认,自己维护底层网络并发、心跳和数据排队,始终是一件略显繁琐的“脏活累活”。
如果有得选,谁不想用官方封装好的高级接口呢?
最近我在联系其他券商的支持miniQMT的测试账号,这就意味着我可以解锁 QMT 的终极形态——MiniQMT (结合 xtquant 标准库) 了!
在后面的文章中,我们将暂别原生 Socket 的底层泥沼,去体验 xtquant 的丝滑连线,争取用寥寥数行代码,完成极速实盘级模拟下单与资产订阅。新地图即将开启,敬请期待!