VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
一、源码
""" 注册EVENT_TICK、EVENT_CONTRACT,当有EVENT_TICK的时候,调用process_contract_event函数(其实就是record_tick函数), 将task put到queue 通过run函数,从self.queue获得task(Tick、Bar),调用database_manager的方法储存数据 """ from threading import Thread from queue import Queue, Empty from copy import copy from vnpy.event import Event, EventEngine from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.object import ( SubscribeRequest, TickData, BarData, ContractData ) from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT from vnpy.trader.utility import load_json, save_json, BarGenerator from vnpy.trader.database import database_manager APP_NAME = "DataRecorder" EVENT_RECORDER_LOG = "eRecorderLog" EVENT_RECORDER_UPDATE = "eRecorderUpdate" class RecorderEngine(BaseEngine): """""" setting_filename = "data_recorder_setting.json" def __init__(self, main_engine: MainEngine, event_engine: EventEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) self.queue = Queue() self.thread = Thread(target=self.run) self.active = False self.tick_recordings = {} self.bar_recordings = {} self.bar_generators = {} self.load_setting() self.register_event() self.start() self.put_event() def load_setting(self): """""" setting = load_json(self.setting_filename) self.tick_recordings = setting.get("tick", {}) self.bar_recordings = setting.get("bar", {}) def save_setting(self): """""" setting = { "tick": self.tick_recordings, "bar": self.bar_recordings } save_json(self.setting_filename, setting) def run(self): """ 调用database_manager的方法储存数据 """ while self.active: try: task = self.queue.get(timeout=1) task_type, data = task if task_type == "tick": database_manager.save_tick_data([data]) elif task_type == "bar": database_manager.save_bar_data([data]) except Empty: continue def close(self): """""" self.active = False if self.thread.isAlive(): self.thread.join() def start(self): """""" self.active = True self.thread.start() def add_bar_recording(self, vt_symbol: str): """ 将symbol数据写入bar_recordings["symbol"]这个字典,订阅合约,调用save_setting进行保存(里面有tick_recordings,bar_recordings字典), 然后调用put_event(实质为调用Eventengine的self._queue.put(event)),将事件放入队列。 """ if vt_symbol in self.bar_recordings: self.write_log(f"已在K线记录列表中:{vt_symbol}") return contract = self.main_engine.get_contract(vt_symbol) if not contract: self.write_log(f"找不到合约:{vt_symbol}") return self.bar_recordings[vt_symbol] = { "symbol": contract.symbol, "exchange": contract.exchange.value, "gateway_name": contract.gateway_name } self.subscribe(contract) self.save_setting() self.put_event() self.write_log(f"添加K线记录成功:{vt_symbol}") def add_tick_recording(self, vt_symbol: str): """""" if vt_symbol in self.tick_recordings: self.write_log(f"已在Tick记录列表中:{vt_symbol}") return contract = self.main_engine.get_contract(vt_symbol) if not contract: self.write_log(f"找不到合约:{vt_symbol}") return self.tick_recordings[vt_symbol] = { "symbol": contract.symbol, "exchange": contract.exchange.value, "gateway_name": contract.gateway_name } self.subscribe(contract) self.save_setting() self.put_event() self.write_log(f"添加Tick记录成功:{vt_symbol}") def remove_bar_recording(self, vt_symbol: str): """""" if vt_symbol not in self.bar_recordings: self.write_log(f"不在K线记录列表中:{vt_symbol}") return self.bar_recordings.pop(vt_symbol) self.save_setting() #调用下面的put_event方法,将EVENT_RECORDER_UPDATE放入队列 self.put_event() self.write_log(f"移除K线记录成功:{vt_symbol}") def remove_tick_recording(self, vt_symbol: str): """""" if vt_symbol not in self.tick_recordings: self.write_log(f"不在Tick记录列表中:{vt_symbol}") return self.tick_recordings.pop(vt_symbol) self.save_setting() self.put_event() self.write_log(f"移除Tick记录成功:{vt_symbol}") def register_event(self): """""" self.event_engine.register(EVENT_TICK, self.process_tick_event) self.event_engine.register(EVENT_CONTRACT, self.process_contract_event) def process_tick_event(self, event: Event): """ 调用下面的record_tick方法(其实就是将task put到queue) """ tick = event.data if tick.vt_symbol in self.tick_recordings: self.record_tick(tick) if tick.vt_symbol in self.bar_recordings: bg = self.get_bar_generator(tick.vt_symbol) bg.update_tick(tick) def process_contract_event(self, event: Event): """""" contract = event.data vt_symbol = contract.vt_symbol if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings): self.subscribe(contract) def write_log(self, msg: str): """""" event = Event( EVENT_RECORDER_LOG, msg ) self.event_engine.put(event) def put_event(self): """ 调用event_engine的put方法 """ tick_symbols = list(self.tick_recordings.keys()) tick_symbols.sort() bar_symbols = list(self.bar_recordings.keys()) bar_symbols.sort() data = { "tick": tick_symbols, "bar": bar_symbols } event = Event( EVENT_RECORDER_UPDATE, data ) self.event_engine.put(event) def record_tick(self, tick: TickData): """ 将task put到queue, 不过这里为什么要用self.queue,不是调用put_event? """ task = ("tick", copy(tick)) self.queue.put(task) def record_bar(self, bar: BarData): """""" task = ("bar", copy(bar)) self.queue.put(task) def get_bar_generator(self, vt_symbol: str): """ 从bar_generators这个字典通过symbol取出bg的实例 """ bg = self.bar_generators.get(vt_symbol, None) if not bg: bg = BarGenerator(self.record_bar) self.bar_generators[vt_symbol] = bg return bg def subscribe(self, contract: ContractData): """""" req = SubscribeRequest( symbol=contract.symbol, exchange=contract.exchange ) self.main_engine.subscribe(req, contract.gateway_name)
二、database_manager
在DataRecorder模块有下面的代码
from vnpy.trader.database import database_manager if task_type == "tick": database_manager.save_tick_data([data]) elif task_type == "bar": database_manager.save_bar_data([data])
可是在vnpy.trader.database下面没有找到这个database_manager,
只在init.py找到下面这句。
from vnpy.trader.database.database import BaseDatabaseManager database_manager: "BaseDatabaseManager" = init(settings=settings)
原来它调用的是initialize中的init方法,返回的是BaseDatabaseManager对象
它的save_tick_data方法是在C:\vnstudio\Lib\site-packages\vnpy\~rader\database\database.py文件中BaseDatabaseManager这个类定义的。
然后在上面的import中,将这个类import了进来。
@abstractmethod def save_tick_data( self, datas: Sequence["TickData"], ): pass
参考:https://www.vnpy.com/forum/topic/805-databaseyuan-ma-yue-du-bi-ji-+pei-zhi-jiao-cheng