Home >  > VNPY源码(四)DataRecorder

VNPY源码(四)DataRecorder

0

VNPY源码学习系列文章:

VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流

一、源码

"""
注册EVENT_TICK、EVENT_CONTRACT,当有EVENT_TICK的时候,调用process_contract_event函数(其实就是record_tick函数),
将task put到queue
通过run函数,从self.queue获得task(Tick、Bar),调用database_manager的方法储存数据
"""

from threading import Thread
from queue import Queue, Empty
from copy import copy

from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.object import (
    SubscribeRequest,
    TickData,
    BarData,
    ContractData
)
from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT
from vnpy.trader.utility import load_json, save_json, BarGenerator
from vnpy.trader.database import database_manager


APP_NAME = "DataRecorder"

EVENT_RECORDER_LOG = "eRecorderLog"
EVENT_RECORDER_UPDATE = "eRecorderUpdate"


class RecorderEngine(BaseEngine):
    """"""
    setting_filename = "data_recorder_setting.json"

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super().__init__(main_engine, event_engine, APP_NAME)

        self.queue = Queue()
        self.thread = Thread(target=self.run)
        self.active = False

        self.tick_recordings = {}
        self.bar_recordings = {}
        self.bar_generators = {}

        self.load_setting()
        self.register_event()
        self.start()
        self.put_event()

    def load_setting(self):
        """"""
        setting = load_json(self.setting_filename)
        self.tick_recordings = setting.get("tick", {})
        self.bar_recordings = setting.get("bar", {})

    def save_setting(self):
        """"""
        setting = {
            "tick": self.tick_recordings,
            "bar": self.bar_recordings
        }
        save_json(self.setting_filename, setting)

    def run(self):
        """
        调用database_manager的方法储存数据
        """
        while self.active:
            try:
                task = self.queue.get(timeout=1)
                task_type, data = task

                if task_type == "tick":
                    database_manager.save_tick_data([data])
                elif task_type == "bar":
                    database_manager.save_bar_data([data])

            except Empty:
                continue

    def close(self):
        """"""
        self.active = False

        if self.thread.isAlive():
            self.thread.join()

    def start(self):
        """"""
        self.active = True
        self.thread.start()

    def add_bar_recording(self, vt_symbol: str):
        """
        将symbol数据写入bar_recordings["symbol"]这个字典,订阅合约,调用save_setting进行保存(里面有tick_recordings,bar_recordings字典),
        然后调用put_event(实质为调用Eventengine的self._queue.put(event)),将事件放入队列。
        """
        if vt_symbol in self.bar_recordings:
            self.write_log(f"已在K线记录列表中:{vt_symbol}")
            return

        contract = self.main_engine.get_contract(vt_symbol)
        if not contract:
            self.write_log(f"找不到合约:{vt_symbol}")
            return

        self.bar_recordings[vt_symbol] = {
            "symbol": contract.symbol,
            "exchange": contract.exchange.value,
            "gateway_name": contract.gateway_name
        }

        self.subscribe(contract)
        self.save_setting()
        self.put_event()

        self.write_log(f"添加K线记录成功:{vt_symbol}")

    def add_tick_recording(self, vt_symbol: str):
        """"""
        if vt_symbol in self.tick_recordings:
            self.write_log(f"已在Tick记录列表中:{vt_symbol}")
            return

        contract = self.main_engine.get_contract(vt_symbol)
        if not contract:
            self.write_log(f"找不到合约:{vt_symbol}")
            return

        self.tick_recordings[vt_symbol] = {
            "symbol": contract.symbol,
            "exchange": contract.exchange.value,
            "gateway_name": contract.gateway_name
        }

        self.subscribe(contract)
        self.save_setting()
        self.put_event()

        self.write_log(f"添加Tick记录成功:{vt_symbol}")

    def remove_bar_recording(self, vt_symbol: str):
        """"""
        if vt_symbol not in self.bar_recordings:
            self.write_log(f"不在K线记录列表中:{vt_symbol}")
            return

        self.bar_recordings.pop(vt_symbol)
        self.save_setting()
        #调用下面的put_event方法,将EVENT_RECORDER_UPDATE放入队列
        self.put_event()

        self.write_log(f"移除K线记录成功:{vt_symbol}")

    def remove_tick_recording(self, vt_symbol: str):
        """"""
        if vt_symbol not in self.tick_recordings:
            self.write_log(f"不在Tick记录列表中:{vt_symbol}")
            return

        self.tick_recordings.pop(vt_symbol)
        self.save_setting()
        self.put_event()

        self.write_log(f"移除Tick记录成功:{vt_symbol}")

    def register_event(self):
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)

    def process_tick_event(self, event: Event):
        """
        调用下面的record_tick方法(其实就是将task put到queue)
        """
        tick = event.data

        if tick.vt_symbol in self.tick_recordings:
            self.record_tick(tick)

        if tick.vt_symbol in self.bar_recordings:
            bg = self.get_bar_generator(tick.vt_symbol)
            bg.update_tick(tick)

    def process_contract_event(self, event: Event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol

        if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
            self.subscribe(contract)

    def write_log(self, msg: str):
        """"""
        event = Event(
            EVENT_RECORDER_LOG,
            msg
        )
        self.event_engine.put(event)

    def put_event(self):
        """
        调用event_engine的put方法
        """
        tick_symbols = list(self.tick_recordings.keys())
        tick_symbols.sort()

        bar_symbols = list(self.bar_recordings.keys())
        bar_symbols.sort()

        data = {
            "tick": tick_symbols,
            "bar": bar_symbols
        }

        event = Event(
            EVENT_RECORDER_UPDATE,
            data
        )
        self.event_engine.put(event)

    def record_tick(self, tick: TickData):
        """
        将task put到queue, 不过这里为什么要用self.queue,不是调用put_event?
        """
        task = ("tick", copy(tick))
        self.queue.put(task)

    def record_bar(self, bar: BarData):
        """"""
        task = ("bar", copy(bar))
        self.queue.put(task)

    def get_bar_generator(self, vt_symbol: str):
        """
        从bar_generators这个字典通过symbol取出bg的实例
        """
        bg = self.bar_generators.get(vt_symbol, None)

        if not bg:
            bg = BarGenerator(self.record_bar)
            self.bar_generators[vt_symbol] = bg

        return bg

    def subscribe(self, contract: ContractData):
        """"""
        req = SubscribeRequest(
            symbol=contract.symbol,
            exchange=contract.exchange
        )
        self.main_engine.subscribe(req, contract.gateway_name)

二、database_manager

在DataRecorder模块有下面的代码

from vnpy.trader.database import database_manager
                if task_type == "tick":
                    database_manager.save_tick_data([data])
                elif task_type == "bar":
                    database_manager.save_bar_data([data])

可是在vnpy.trader.database下面没有找到这个database_manager,
只在init.py找到下面这句。

from vnpy.trader.database.database import BaseDatabaseManager

database_manager: "BaseDatabaseManager" = init(settings=settings)

原来它调用的是initialize中的init方法,返回的是BaseDatabaseManager对象
它的save_tick_data方法是在C:\vnstudio\Lib\site-packages\vnpy\~rader\database\database.py文件中BaseDatabaseManager这个类定义的。
然后在上面的import中,将这个类import了进来。

    @abstractmethod
    def save_tick_data(
        self,
        datas: Sequence["TickData"],
    ):
        pass

参考:https://www.vnpy.com/forum/topic/805-databaseyuan-ma-yue-du-bi-ji-+pei-zhi-jiao-cheng

暧昧帖

本文暂无标签

发表评论

*

*