蜗牛博客VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
一、init_cli_trading
init_cli_trading位于C:\vnstudio\Lib\site-packages\vnpy\app\script_trader下面的cli.py文件中,这个文件夹下面还有其他的文件,但是通过 from vnpy.app.script_trader import init_cli_trading就可以导入进来了。
在C:\vnstudio\Lib\site-packages\vnpy\gateway\ctptest新建test.py文件。
from vnpy.gateway.ctp.ctp_gateway import CtpGateway from vnpy.app.script_trader import init_cli_trading setting = { "用户名": "1xx11", "密码": "axx4", "经纪商代码": "9999", "交易服务器": "180.168.146.187:10130", "行情服务器": "180.168.146.187:10131;", "产品名称": "simnow_client_test", "授权编码": "0000000000000000", "产品信息": "11111" } engine = init_cli_trading([CtpGateway]) engine.connect_gateway(setting,"CTP")
成果展示:
然后在jupyter中运行,
# 查询所有合约 engine.get_all_contracts(use_df=True)
其他命令:
# 查询资金 engine.get_all_accounts(use_df=True) # 查询持仓 engine.get_all_positions(use_df=True) # 查询活动委托 engine.get_all_active_orders(use_df=True) # 订阅行情 engine.subscribe(["zn1910.SHFE"]) # 查询行情 engine.get_tick("zn1910.SHFE", use_df=True) # 委托下单 vt_orderid = engine.buy("zn1910.SHFE", 32, 100) print(vt_orderid) # 查询特定委托 engine.get_order(vt_orderid) # 委托撤单 engine.cancel_order(vt_orderid)
二、利用vnpy获取行情
from pathlib import Path from threading import Thread from time import sleep from vnpy.api.ctp.vnctpmd import MdApi EVENT_LOG = "log" EVENT_CONTRACT = "contract" EVENT_TICK = "tick" EVENT_BAR = "bar" EVENT_ERROR = "error" EVENT_POSITION = "position" EVENT_TRADE = "trade" EVENT_ORDER = "order" EVENT_ACCOUNT = "account" EVENT_SHARED = "shared" EVENT_LAST = "last" EVENT_INIT_FINISHED = "init" def _get_trader_dir(temp_name: str): """ Get path where trader is running in. """ cwd = Path.cwd() temp_path = cwd.joinpath(temp_name) # If .vntrader folder exists in current working directory, # then use it as trader running path. if temp_path.exists(): return cwd, temp_path # Otherwise use home path of system. home_path = Path.home() temp_path = home_path.joinpath(temp_name) if not temp_path.exists(): temp_path.mkdir() return home_path, temp_path def get_folder_path(folder_name: str): """ Get path for temp folder with folder name. """ TRADER_DIR, TEMP_DIR = _get_trader_dir(".ctpbee") folder_path = TEMP_DIR.joinpath(folder_name) if not folder_path.exists(): folder_path.mkdir() return folder_path class BeeMdApi(MdApi): """""" def __init__(self): """Constructor""" super(BeeMdApi, self).__init__() self.gateway_name = "ctp" self.reqid = 0 self.connect_status = False self.login_status = False self.subscribed = set() self.userid = "" self.password = "" self.brokerid = 0 @property def md_status(self): return self.login_status def on_event(self, type, data): print(f"事件类型{type}, data={data}") def onFrontConnected(self): """ Callback when front server is connected. """ self.connect_status = True self.on_event(type=EVENT_LOG, data="行情服务器连接成功") self.login() def onFrontDisconnected(self, reason: int): """ Callback when front server is disconnected. """ self.connect_status = False self.login_status = False self.on_event(type=EVENT_LOG, data=f"行情连接断开,原因{reason}") def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool): """ Callback when user is logged in. """ if not error["ErrorID"]: self.login_status = True self.on_event(type=EVENT_LOG, data="行情服务器登录成功") for symbol in self.subscribed: self.subscribeMarketData(symbol) else: error["detail"] = "行情登录失败" self.on_event(type=EVENT_ERROR, data=error) def onRspError(self, error: dict, reqid: int, last: bool): """ Callback when error occured. """ error['detail'] = "行情接口报错" self.on_event(type=EVENT_ERROR, data=error) def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool): """""" if not error or not error["ErrorID"]: return error['detail'] = "行情订阅失败" self.on_event(type=EVENT_ERROR, data=error) def onRtnDepthMarketData(self, data: dict): """ Callback of tick data update. """ print(data) def connect(self, info: dict): """ Start connection to server. """ self.userid = info['userid'] self.password = info['password'] self.brokerid = info['brokerid'] # If not connected, then start connection first. if not self.connect_status: path = get_folder_path(self.gateway_name.lower() + f"/{self.userid}") self.createFtdcMdApi(str(path) + "\\Md") self.registerFront(info['md_address']) self.init() # If already connected, then login immediately. elif not self.login_status: self.login() def login(self): """ Login onto server. """ req = { "UserID": self.userid, "Password": self.password, "BrokerID": self.brokerid } self.reqid += 1 self.reqUserLogin(req, self.reqid) def subscribe(self, symbol): """ Subscribe to tick data update. """ result = None if self.login_status: result = self.subscribeMarketData(symbol) self.subscribed.add(symbol) return result def close(self): """ Close the connection. """ if self.connect_status: self.exit() def run_login(): gateway = BeeMdApi() info = { "CONNECT_INFO": { "userid": "", "password": "", "brokerid": "9999", "md_address": "tcp://180.168.146.187:10131", "td_address": "tcp://180.168.146.187:10130", # "md_address": "tcp://218.202.237.33:10112", # "td_address": "tcp://218.202.237.33:10102", "product_info": "", "appid": "simnow_client_test", "auth_code": "0000000000000000", }, "INTERFACE": "ctp", "TD_FUNC": True, "MD_FUNC": True, } gateway.connect( info["CONNECT_INFO"] ) gateway.subscribe("ag1912") while True: sleep(1) print("-"*30) run_login()
成果展示:
可参考:
https://github.com/vnpy/vnpy/blob/master/examples/notebook_trading/demo_notebook.ipynb