Home >  > VNPY源码(三)主引擎MainEngine及SubscribeRequest

VNPY源码(三)主引擎MainEngine及SubscribeRequest

0

蜗牛博客VNPY源码学习系列文章:

VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流

负责所有引擎的实例化。

一、源码

位于C:\vnstudio\Lib\site-packages\vnpy\trader\engine.py下面。这个文件下面有:MainEngine、BaseEngine(ABC)、LogEngine(BaseEngine)、OmsEngine(BaseEngine)、EmailEngine(BaseEngine)共5个Engine。我们先学习MainEngine。

所有的gateway都放在self.gateways字典里面,对应vnpy UI界面的连接菜单的内容。

Subscribe逻辑:
1.add_gateway生成一个self.gateways字典
2.调用get_gateway函数取出CtpGateway实例
3.调用CtpGateway实例的subscribe函数
4.底层API通过sambol,exchange的形式subscribe

class MainEngine:
    """
    Acts as the core of VN Trader.
    """

    def __init__(self, event_engine: EventEngine = None):
        """"""
        if event_engine:
            self.event_engine = event_engine
        else:
            self.event_engine = EventEngine()
        self.event_engine.start()

        self.gateways = {}
        self.engines = {}
        self.apps = {}
        self.exchanges = []

        os.chdir(TRADER_DIR)    # Change working directory
        self.init_engines()     # Initialize function engines

    def add_engine(self, engine_class: Any):
        """
        Add function engine.
        """
        engine = engine_class(self, self.event_engine)
        self.engines[engine.engine_name] = engine
        return engine

    def add_gateway(self, gateway_class: Type[BaseGateway]):
        """
        Add gateway. 这个函数的作用是传入CTPGateway,将传入的CTPGateway实例化(将event_engine作为参数,将存入self.gateways这个字典中,最后返回CTPGateway实例)
        """
		#这里得到一个gateway_class(是CTPGateway之类,不是BaseGateway)的实例,实例的参数是init MainEngine的时候传入的event_engine
        gateway = gateway_class(self.event_engine)
		#调用上面的实例的gateway_name属性,并作为字典的键
		#这里得到了gateways字典,在下面的get_gateway函数要用,取出gateway。
        self.gateways[gateway.gateway_name] = gateway

        # Add gateway supported exchanges into engine
		#取出gateway的exchanges类属性(列表,非实例属性),
        for exchange in gateway.exchanges:
			#如果类的exchanges,不在当前实例的exchanges里面,则添加进来。
            if exchange not in self.exchanges:
                self.exchanges.append(exchange)

		#返回CTPGateway
        return gateway

    def add_app(self, app_class: Type[BaseApp]):
        """
        Add app.
        """
        app = app_class()
        self.apps[app.app_name] = app

        engine = self.add_engine(app.engine_class)
        return engine

    def init_engines(self):
        """
        Init all engines.
        """
        self.add_engine(LogEngine)
        self.add_engine(OmsEngine)
        self.add_engine(EmailEngine)

    def write_log(self, msg: str, source: str = ""):
        """
        Put log event with specific message.
        """
		#LogData继承自BaseData,BaseData有gateway_name,所以这里可以传gateway_name,得到LogData对象。
        log = LogData(msg=msg, gateway_name=source)
        event = Event(EVENT_LOG, log)
        self.event_engine.put(event)

    def get_gateway(self, gateway_name: str):
        """
        Return gateway object by name.作用是传入CtpGateway,从字典中取出CtpGateway实例,再返回这个实例
        """
        gateway = self.gateways.get(gateway_name, None)
        if not gateway:
            self.write_log(f"找不到底层接口:{gateway_name}")
		#返回CtpGateway的实例
        return gateway

    def get_engine(self, engine_name: str):
        """
        Return engine object by name.
        """
        engine = self.engines.get(engine_name, None)
        if not engine:
            self.write_log(f"找不到引擎:{engine_name}")
        return engine

    def get_default_setting(self, gateway_name: str):
        """
        Get default setting dict of a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            return gateway.get_default_setting()
        return None

    def get_all_gateway_names(self):
        """
        Get all names of gatewasy added in main engine.
        """
        return list(self.gateways.keys())

    def get_all_apps(self):
        """
        Get all app objects.
        """
        return list(self.apps.values())

    def get_all_exchanges(self):
        """
        Get all exchanges.
        """
        return self.exchanges

    def connect(self, setting: dict, gateway_name: str):
        """
        Start connection of a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            gateway.connect(setting)

    def subscribe(self, req: SubscribeRequest, gateway_name: str):
        """
        Subscribe tick data update of a specific gateway.根据传入的CtpGateway,调用get_gateway函数取出CtpGateway实例,然后订阅行情。
        """
		#得到CTPGateway实例
        gateway = self.get_gateway(gateway_name)
        if gateway:
			#调用CTPGateway实例的subscribe方法,而self.md_api.subscribe(req)的方法就是self.md_api.subscribe(req),即底层API,而传入的参数是SubscribeRequest(一个类),应该是{self.symbol}.{self.exchange.value}这样的形式。
            gateway.subscribe(req)

    def send_order(self, req: OrderRequest, gateway_name: str):
        """
        Send new order request to a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            return gateway.send_order(req)
        else:
            return ""

    def cancel_order(self, req: CancelRequest, gateway_name: str):
        """
        Send cancel order request to a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            gateway.cancel_order(req)

    def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str):
        """
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            return gateway.send_orders(reqs)
        else:
            return ["" for req in reqs]

    def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str):
        """
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            gateway.cancel_orders(reqs)

    def query_history(self, req: HistoryRequest, gateway_name: str):
        """
        Send cancel order request to a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            return gateway.query_history(req)
        else:
            return None

    def close(self):
        """
        Make sure every gateway and app is closed properly before
        programme exit.
        """
        # Stop event engine first to prevent new timer event.
        self.event_engine.stop()

        for engine in self.engines.values():
            engine.close()

        for gateway in self.gateways.values():
            gateway.close()

二、利用Maniengine
1.订阅行情
其实关键的代码只有4行,但是有一个地方需要注意,就是SETTINGS的语句必须要写,不然在cmd窗口打印不出信息。

from vnpy.event import EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.gateway.ctp.ctp_gateway import CtpGateway
from vnpy.trader.setting import SETTINGS
from logging import INFO


SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True


setting = {
    "用户名": "",
    "密码": "",
    "经纪商代码": "9999",
    "交易服务器": "180.168.146.187:10130",
    "行情服务器": "180.168.146.187:10131;",
    "产品名称": "simnow_client_test",
    "授权编码": "0000000000000000",
    "产品信息": "11111"
}

event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.connect(setting, "CTP")

成果展示:

2.打印Tick

from vnpy.event import EventEngine,Event
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.gateway.ctp.ctp_gateway import CtpGateway
from vnpy.trader.event import EVENT_LOG,EVENT_TICK
from vnpy.trader.object import SubscribeRequest,ContractData
from vnpy.trader.constant import Exchange

setting = {
    "用户名": "",
    "密码": "",
    "经纪商代码": "9999",
    "交易服务器": "180.168.146.187:10130",
    "行情服务器": "180.168.146.187:10131;",
    "产品名称": "simnow_client_test",
    "授权编码": "0000000000000000",
    "产品信息": "11111"
}

def process_tick_event(event: Event):
    """"""
    tick = event.data
    print(tick)
    print("--"*40)

contract = ContractData(symbol="zn1910",
	             exchange=Exchange("SHFE"),
	             gateway_name="CTP",
	             name="SHFE",
	             product="SHFE",
	             size=100,
	             pricetick=2.0,
	             )

event_engine = EventEngine()
event_engine.register(EVENT_TICK, process_tick_event)  #注册事件
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.connect(setting, "CTP")
main_engine.subscribe(contract,"CTP")  #订阅行情

成果展示

小知识:
其他的用法:

def process_log_event(event: Event):
    """"""
    log = event.data
    print(f"{log.time}\t{log.msg}")

event_engine.register(EVENT_LOG, process_log_event)

self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)

三、关于SubscribeRequest
小知识:
__post_init__就是初始化的意思吧,但是必须加上@dataclass才起作用。下面的例子相当于合约的合成。

from dataclasses import dataclass
from vnpy.trader.object import SubscribeRequest,ContractData
from vnpy.trader.constant import Exchange

contract = ContractData(symbol="zn1910",
	             exchange=Exchange("SHFE"),
	             gateway_name="CTP",
	             name="SHFE",
	             product="SHFE",
	             size=100,
	             pricetick=2.0,
	             )


@dataclass
class SubscribeRequest:
    """
    Request sending to specific gateway for subscribing tick data update.
    """

    symbol: str
    exchange: Exchange

    def __post_init__(self):
        """"""
        self.vt_symbol = f"{self.symbol}.{self.exchange.value}"


a = SubscribeRequest(symbol="600031",exchange=contract.exchange)
print(a.symbol) 
print(a.vt_symbol)

执行结果:

600031
600031.SHFE

四、数据流程
CtpMdApi和CtpTdApi中的Md和Td分别是什么的缩写啊
MarketData和Trade
https://blog.csdn.net/u011331731/article/details/88946916

暧昧帖

本文暂无标签

发表评论

*

*