蜗牛博客VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
负责所有引擎的实例化。
一、源码
位于C:\vnstudio\Lib\site-packages\vnpy\trader\engine.py下面。这个文件下面有:MainEngine、BaseEngine(ABC)、LogEngine(BaseEngine)、OmsEngine(BaseEngine)、EmailEngine(BaseEngine)共5个Engine。我们先学习MainEngine。
所有的gateway都放在self.gateways字典里面,对应vnpy UI界面的连接菜单的内容。
Subscribe逻辑:
1.add_gateway生成一个self.gateways字典
2.调用get_gateway函数取出CtpGateway实例
3.调用CtpGateway实例的subscribe函数
4.底层API通过sambol,exchange的形式subscribe
class MainEngine: """ Acts as the core of VN Trader. """ def __init__(self, event_engine: EventEngine = None): """""" if event_engine: self.event_engine = event_engine else: self.event_engine = EventEngine() self.event_engine.start() self.gateways = {} self.engines = {} self.apps = {} self.exchanges = [] os.chdir(TRADER_DIR) # Change working directory self.init_engines() # Initialize function engines def add_engine(self, engine_class: Any): """ Add function engine. """ engine = engine_class(self, self.event_engine) self.engines[engine.engine_name] = engine return engine def add_gateway(self, gateway_class: Type[BaseGateway]): """ Add gateway. 这个函数的作用是传入CTPGateway,将传入的CTPGateway实例化(将event_engine作为参数,将存入self.gateways这个字典中,最后返回CTPGateway实例) """ #这里得到一个gateway_class(是CTPGateway之类,不是BaseGateway)的实例,实例的参数是init MainEngine的时候传入的event_engine gateway = gateway_class(self.event_engine) #调用上面的实例的gateway_name属性,并作为字典的键 #这里得到了gateways字典,在下面的get_gateway函数要用,取出gateway。 self.gateways[gateway.gateway_name] = gateway # Add gateway supported exchanges into engine #取出gateway的exchanges类属性(列表,非实例属性), for exchange in gateway.exchanges: #如果类的exchanges,不在当前实例的exchanges里面,则添加进来。 if exchange not in self.exchanges: self.exchanges.append(exchange) #返回CTPGateway return gateway def add_app(self, app_class: Type[BaseApp]): """ Add app. """ app = app_class() self.apps[app.app_name] = app engine = self.add_engine(app.engine_class) return engine def init_engines(self): """ Init all engines. """ self.add_engine(LogEngine) self.add_engine(OmsEngine) self.add_engine(EmailEngine) def write_log(self, msg: str, source: str = ""): """ Put log event with specific message. """ #LogData继承自BaseData,BaseData有gateway_name,所以这里可以传gateway_name,得到LogData对象。 log = LogData(msg=msg, gateway_name=source) event = Event(EVENT_LOG, log) self.event_engine.put(event) def get_gateway(self, gateway_name: str): """ Return gateway object by name.作用是传入CtpGateway,从字典中取出CtpGateway实例,再返回这个实例 """ gateway = self.gateways.get(gateway_name, None) if not gateway: self.write_log(f"找不到底层接口:{gateway_name}") #返回CtpGateway的实例 return gateway def get_engine(self, engine_name: str): """ Return engine object by name. """ engine = self.engines.get(engine_name, None) if not engine: self.write_log(f"找不到引擎:{engine_name}") return engine def get_default_setting(self, gateway_name: str): """ Get default setting dict of a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.get_default_setting() return None def get_all_gateway_names(self): """ Get all names of gatewasy added in main engine. """ return list(self.gateways.keys()) def get_all_apps(self): """ Get all app objects. """ return list(self.apps.values()) def get_all_exchanges(self): """ Get all exchanges. """ return self.exchanges def connect(self, setting: dict, gateway_name: str): """ Start connection of a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: gateway.connect(setting) def subscribe(self, req: SubscribeRequest, gateway_name: str): """ Subscribe tick data update of a specific gateway.根据传入的CtpGateway,调用get_gateway函数取出CtpGateway实例,然后订阅行情。 """ #得到CTPGateway实例 gateway = self.get_gateway(gateway_name) if gateway: #调用CTPGateway实例的subscribe方法,而self.md_api.subscribe(req)的方法就是self.md_api.subscribe(req),即底层API,而传入的参数是SubscribeRequest(一个类),应该是{self.symbol}.{self.exchange.value}这样的形式。 gateway.subscribe(req) def send_order(self, req: OrderRequest, gateway_name: str): """ Send new order request to a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.send_order(req) else: return "" def cancel_order(self, req: CancelRequest, gateway_name: str): """ Send cancel order request to a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: gateway.cancel_order(req) def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str): """ """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.send_orders(reqs) else: return ["" for req in reqs] def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str): """ """ gateway = self.get_gateway(gateway_name) if gateway: gateway.cancel_orders(reqs) def query_history(self, req: HistoryRequest, gateway_name: str): """ Send cancel order request to a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.query_history(req) else: return None def close(self): """ Make sure every gateway and app is closed properly before programme exit. """ # Stop event engine first to prevent new timer event. self.event_engine.stop() for engine in self.engines.values(): engine.close() for gateway in self.gateways.values(): gateway.close()
二、利用Maniengine
1.订阅行情
其实关键的代码只有4行,但是有一个地方需要注意,就是SETTINGS的语句必须要写,不然在cmd窗口打印不出信息。
from vnpy.event import EventEngine from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.gateway.ctp.ctp_gateway import CtpGateway from vnpy.trader.setting import SETTINGS from logging import INFO SETTINGS["log.active"] = True SETTINGS["log.level"] = INFO SETTINGS["log.console"] = True setting = { "用户名": "", "密码": "", "经纪商代码": "9999", "交易服务器": "180.168.146.187:10130", "行情服务器": "180.168.146.187:10131;", "产品名称": "simnow_client_test", "授权编码": "0000000000000000", "产品信息": "11111" } event_engine = EventEngine() main_engine = MainEngine(event_engine) main_engine.add_gateway(CtpGateway) main_engine.connect(setting, "CTP")
成果展示:
2.打印Tick
from vnpy.event import EventEngine,Event from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.gateway.ctp.ctp_gateway import CtpGateway from vnpy.trader.event import EVENT_LOG,EVENT_TICK from vnpy.trader.object import SubscribeRequest,ContractData from vnpy.trader.constant import Exchange setting = { "用户名": "", "密码": "", "经纪商代码": "9999", "交易服务器": "180.168.146.187:10130", "行情服务器": "180.168.146.187:10131;", "产品名称": "simnow_client_test", "授权编码": "0000000000000000", "产品信息": "11111" } def process_tick_event(event: Event): """""" tick = event.data print(tick) print("--"*40) contract = ContractData(symbol="zn1910", exchange=Exchange("SHFE"), gateway_name="CTP", name="SHFE", product="SHFE", size=100, pricetick=2.0, ) event_engine = EventEngine() event_engine.register(EVENT_TICK, process_tick_event) #注册事件 main_engine = MainEngine(event_engine) main_engine.add_gateway(CtpGateway) main_engine.connect(setting, "CTP") main_engine.subscribe(contract,"CTP") #订阅行情
成果展示
小知识:
其他的用法:
def process_log_event(event: Event): """""" log = event.data print(f"{log.time}\t{log.msg}") event_engine.register(EVENT_LOG, process_log_event) self.event_engine.register(EVENT_ORDER, self.process_order_event) self.event_engine.register(EVENT_TIMER, self.process_timer_event) self.event_engine.register(EVENT_TRADE, self.process_trade_event)
三、关于SubscribeRequest
小知识:
__post_init__就是初始化的意思吧,但是必须加上@dataclass才起作用。下面的例子相当于合约的合成。
from dataclasses import dataclass from vnpy.trader.object import SubscribeRequest,ContractData from vnpy.trader.constant import Exchange contract = ContractData(symbol="zn1910", exchange=Exchange("SHFE"), gateway_name="CTP", name="SHFE", product="SHFE", size=100, pricetick=2.0, ) @dataclass class SubscribeRequest: """ Request sending to specific gateway for subscribing tick data update. """ symbol: str exchange: Exchange def __post_init__(self): """""" self.vt_symbol = f"{self.symbol}.{self.exchange.value}" a = SubscribeRequest(symbol="600031",exchange=contract.exchange) print(a.symbol) print(a.vt_symbol)
执行结果:
600031
600031.SHFE
四、数据流程
CtpMdApi和CtpTdApi中的Md和Td分别是什么的缩写啊
MarketData和Trade
https://blog.csdn.net/u011331731/article/details/88946916