Home >  > VNPY源码(二)API获取行情和script_trader

VNPY源码(二)API获取行情和script_trader

0

蜗牛博客VNPY源码学习系列文章:

VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流

一、init_cli_trading
init_cli_trading位于C:\vnstudio\Lib\site-packages\vnpy\app\script_trader下面的cli.py文件中,这个文件夹下面还有其他的文件,但是通过 from vnpy.app.script_trader import init_cli_trading就可以导入进来了。

在C:\vnstudio\Lib\site-packages\vnpy\gateway\ctptest新建test.py文件。

from vnpy.gateway.ctp.ctp_gateway import CtpGateway
from vnpy.app.script_trader import init_cli_trading

setting = {
    "用户名": "1xx11",
    "密码": "axx4",
    "经纪商代码": "9999",
    "交易服务器": "180.168.146.187:10130",
    "行情服务器": "180.168.146.187:10131;",
    "产品名称": "simnow_client_test",
    "授权编码": "0000000000000000",
    "产品信息": "11111"
}
engine = init_cli_trading([CtpGateway])
engine.connect_gateway(setting,"CTP")

成果展示:

然后在jupyter中运行,

# 查询所有合约
engine.get_all_contracts(use_df=True)

其他命令:

# 查询资金
engine.get_all_accounts(use_df=True)

# 查询持仓
engine.get_all_positions(use_df=True)

# 查询活动委托
engine.get_all_active_orders(use_df=True)



# 订阅行情
engine.subscribe(["zn1910.SHFE"])

# 查询行情
engine.get_tick("zn1910.SHFE", use_df=True)

# 委托下单
vt_orderid = engine.buy("zn1910.SHFE", 32, 100)
print(vt_orderid)

# 查询特定委托
engine.get_order(vt_orderid)

# 委托撤单
engine.cancel_order(vt_orderid)

二、利用vnpy获取行情

from pathlib import Path
from threading import Thread
from time import sleep

from vnpy.api.ctp.vnctpmd import MdApi


EVENT_LOG = "log"
EVENT_CONTRACT = "contract"
EVENT_TICK = "tick"
EVENT_BAR = "bar"
EVENT_ERROR = "error"
EVENT_POSITION = "position"
EVENT_TRADE = "trade"
EVENT_ORDER = "order"
EVENT_ACCOUNT = "account"
EVENT_SHARED = "shared"
EVENT_LAST = "last"
EVENT_INIT_FINISHED = "init"


def _get_trader_dir(temp_name: str):
    """
    Get path where trader is running in.
    """
    cwd = Path.cwd()
    temp_path = cwd.joinpath(temp_name)

    # If .vntrader folder exists in current working directory,
    # then use it as trader running path.
    if temp_path.exists():
        return cwd, temp_path

    # Otherwise use home path of system.
    home_path = Path.home()
    temp_path = home_path.joinpath(temp_name)

    if not temp_path.exists():
        temp_path.mkdir()

    return home_path, temp_path


def get_folder_path(folder_name: str):
    """
    Get path for temp folder with folder name.
    """
    TRADER_DIR, TEMP_DIR = _get_trader_dir(".ctpbee")
    folder_path = TEMP_DIR.joinpath(folder_name)
    if not folder_path.exists():
        folder_path.mkdir()
    return folder_path


class BeeMdApi(MdApi):
    """"""

    def __init__(self):
        """Constructor"""
        super(BeeMdApi, self).__init__()

        self.gateway_name = "ctp"

        self.reqid = 0
        self.connect_status = False
        self.login_status = False
        self.subscribed = set()

        self.userid = ""
        self.password = ""
        self.brokerid = 0

    @property
    def md_status(self):
        return self.login_status

    def on_event(self, type, data):
        print(f"事件类型{type}, data={data}")

    def onFrontConnected(self):
        """
        Callback when front server is connected.
        """
        self.connect_status = True
        self.on_event(type=EVENT_LOG, data="行情服务器连接成功")
        self.login()

    def onFrontDisconnected(self, reason: int):
        """
        Callback when front server is disconnected.
        """
        self.connect_status = False
        self.login_status = False
        self.on_event(type=EVENT_LOG, data=f"行情连接断开,原因{reason}")

    def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool):
        """
        Callback when user is logged in.
        """
        if not error["ErrorID"]:
            self.login_status = True
            self.on_event(type=EVENT_LOG, data="行情服务器登录成功")

            for symbol in self.subscribed:
                self.subscribeMarketData(symbol)
        else:
            error["detail"] = "行情登录失败"
            self.on_event(type=EVENT_ERROR, data=error)

    def onRspError(self, error: dict, reqid: int, last: bool):
        """
        Callback when error occured.
        """
        error['detail'] = "行情接口报错"
        self.on_event(type=EVENT_ERROR, data=error)

    def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool):
        """"""
        if not error or not error["ErrorID"]:
            return
        error['detail'] = "行情订阅失败"
        self.on_event(type=EVENT_ERROR, data=error)

    def onRtnDepthMarketData(self, data: dict):
        """
        Callback of tick data update.
        """
        print(data)

    def connect(self, info: dict):
        """
        Start connection to server.
        """
        self.userid = info['userid']
        self.password = info['password']
        self.brokerid = info['brokerid']

        # If not connected, then start connection first.
        if not self.connect_status:
            path = get_folder_path(self.gateway_name.lower() + f"/{self.userid}")
            self.createFtdcMdApi(str(path) + "\\Md")
            self.registerFront(info['md_address'])
            self.init()
        # If already connected, then login immediately.
        elif not self.login_status:
            self.login()

    def login(self):
        """
        Login onto server.
        """
        req = {
            "UserID": self.userid,
            "Password": self.password,
            "BrokerID": self.brokerid
        }

        self.reqid += 1
        self.reqUserLogin(req, self.reqid)

    def subscribe(self, symbol):
        """
        Subscribe to tick data update.
        """
        result = None
        if self.login_status:
            result = self.subscribeMarketData(symbol)
        self.subscribed.add(symbol)
        return result

    def close(self):
        """
        Close the connection.
        """
        if self.connect_status:
            self.exit()


def run_login():
    gateway = BeeMdApi()
    info = {
        "CONNECT_INFO": {
            "userid": "",
            "password": "",
            "brokerid": "9999",
            "md_address": "tcp://180.168.146.187:10131",
            "td_address": "tcp://180.168.146.187:10130",
            # "md_address": "tcp://218.202.237.33:10112",
            # "td_address": "tcp://218.202.237.33:10102",
            "product_info": "",
            "appid": "simnow_client_test",
            "auth_code": "0000000000000000",
        },
        "INTERFACE": "ctp",
        "TD_FUNC": True,
        "MD_FUNC": True,
    }
    gateway.connect(
        info["CONNECT_INFO"]
    )

    gateway.subscribe("ag1912")
    while True:
        sleep(1)
        print("-"*30)

run_login()

成果展示:

可参考:
https://github.com/vnpy/vnpy/blob/master/examples/notebook_trading/demo_notebook.ipynb

暧昧帖

本文暂无标签

发表评论

*

*