VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
一、概述
CtaEngine引擎是策略的容器,它启动的时候会将所有的策略都加载进来。
在no_ui的run.py里面,
SETTINGS["log.active"] = True SETTINGS["log.level"] = INFO SETTINGS["log.console"] = True ctp_setting = { "用户名": "", "密码": "", "经纪商代码": "", "交易服务器": "", "行情服务器": "", "产品名称": "", "授权编码": "", "产品信息": "" } def run_child(): """ Running in the child process. """ SETTINGS["log.file"] = True event_engine = EventEngine() main_engine = MainEngine(event_engine) main_engine.add_gateway(CtpGateway) cta_engine = main_engine.add_app(CtaStrategyApp) main_engine.write_log("主引擎创建成功") log_engine = main_engine.get_engine("log") event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event) main_engine.write_log("注册日志事件监听") main_engine.connect(ctp_setting, "CTP") main_engine.write_log("连接CTP接口") sleep(10) cta_engine.init_engine() #从rqdata获得数据,加载所有策略文件,加载setting文件中设定的策略,加载cta_strategy_data.json文件中的数据,注册tick,order等事件。 main_engine.write_log("CTA策略初始化完成") cta_engine.init_all_strategies() #订阅行情 sleep(60) # Leave enough time to complete strategy initialization main_engine.write_log("CTA策略全部初始化") cta_engine.start_all_strategies() #设定strategy.trading = True,启动策略 main_engine.write_log("CTA策略全部启动") while True: sleep(1)
有一句:
cta_engine = main_engine.add_app(CtaStrategyApp)
其实就相当于相当于CtaEngine(MainEngine(),event_engine)。
我们先来看一下CtaStrategyApp是哪里来的,它是在vnpy/app/cta_strategy/__init__.py这个文件里面定义的。
class CtaStrategyApp(BaseApp): """""" app_name = APP_NAME app_module = __module__ app_path = Path(__file__).parent display_name = "CTA策略" engine_class = CtaEngine widget_name = "CtaManager" icon_name = "cta.ico"
再来看一下这个add_app
它传入的参数是BaseApp。
这个函数的作用相当于:实例化传入的BaseApp(CtaStrategyApp),并添加到self.apps字典里,再添加CtaEngine(传入了MainEngine(),event_engine()), 返回了一个实例化的CtaEngine
def add_app(self, app_class: Type[BaseApp]): """ Add app. """ app = app_class() #实例化类 self.apps[app.app_name] = app #将类对像添加到 self.apps字典里。 engine = self.add_engine(app.engine_class) #这里app.engine_class就是CtaStrategyApp中的engine_class = CtaEngine return engine
#再看add_engine,参数形式为:add_engine(CtaEngine),
def add_engine(self, engine_class: Any): """ Add function engine.(添加功能引擎) """ engine = engine_class(self, self.event_engine) #self指当前类的实例,相当于CtaEngine(MainEngine(),event_engine()),即将主引擎,事件引擎传入了CtaEngine。 self.engines[engine.engine_name] = engine #将实例化后的CtaEngine存入到self.engines字典里面,CtaEngine继承自BaseEngine,BaseEngine中有engine_name return engine #返回实例化后的引擎
顺便说一下mainEngine,它的作用就是负责对所有引擎实例化。
二、CtaEngine
在vnpy/app/cta_strategy/__init__.py里面,有from .engine import CtaEngine这样的语句,其实就是从在vnpy/app/cta_strategy/engine.py这个文件里面导入CtaEngine。下面我们来看这个CtaEngine。
首先,它是继承自BaseEngine,然后初始化的时候,需要传入两个engine.
class CtaEngine(BaseEngine): """""" engine_type = EngineType.LIVE # live trading engine setting_filename = "cta_strategy_setting.json" data_filename = "cta_strategy_data.json" def __init__(self, main_engine: MainEngine, event_engine: EventEngine): """""" super(CtaEngine, self).__init__( main_engine, event_engine, APP_NAME)
在run.py里面,使用了它的下面三个功能:
cta_engine.init_engine() main_engine.write_log("CTA策略初始化完成") cta_engine.init_all_strategies() sleep(60) # Leave enough time to complete strategy initialization main_engine.write_log("CTA策略全部初始化") cta_engine.start_all_strategies() main_engine.write_log("CTA策略全部启动")
(一).init_engine()
def init_engine(self): """ """ self.init_rqdata() self.load_strategy_class() #载入策略 self.load_strategy_setting() self.load_strategy_data() self.register_event() self.write_log("CTA策略引擎初始化成功")
1.init_rqdata()
初始化rqdata,可以看到有from vnpy.trader.rqdata import rqdata_client,说明这里对米筐进行了封装,因为我自己之前用的米筐是直接使用rq.init()。
2.load_strategy_class()
其实最终的结果就是执行了“self.classes[value.__name__] = value”这一句。
def load_strategy_class(self): """ Load strategy class from source code. """ path1 = Path(__file__).parent.joinpath("strategies") #就是变成“strategies\”这样的形式 self.load_strategy_class_from_folder( path1, "vnpy.app.cta_strategy.strategies") path2 = Path.cwd().joinpath("strategies") #就是变成 “C:\Users\Kevin\Desktop\py2\strategies” 这样的形式 self.load_strategy_class_from_folder(path2, "strategies") def load_strategy_class_from_folder(self, path: Path, module_name: str = ""): """ Load strategy class from certain folder.根据传入的文件夹,以及module_name,遍历方位夹内的文件,找到.py文件,拼接成module_name.double_ma这样的形式,再传入到load_strategy_class_from_module这个函数。 """ for dirpath, dirnames, filenames in os.walk(str(path)): for filename in filenames: #遍历strategies下所有的文件 if filename.endswith(".py"): #如果是.py文件 strategy_module_name = ".".join( #拼接成 strategies.double_ma_strategy这样的形式 [module_name, filename.replace(".py", "")]) self.load_strategy_class_from_module(strategy_module_name) #执行load_strategy_class_from_module(strategies.double_ma_strategy) def load_strategy_class_from_module(self, module_name: str): """ Load strategy class from module file. """ try: module = importlib.import_module(module_name) #绝对导入,其中的module_name为strategies.double_ma_strategy。 for name in dir(module): #遍历double_ma_strategy的所有属性、方法 value = getattr(module, name) #只有当name是__class__的时候,isinstance(value, type)才是True,issubclass(value, CtaTemplate) 调试不出来。 if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate): self.classes[value.__name__] = value #将value放到self.classes这个字典里,value.__name__应该就是策略的class_name。 except: # noqa msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}" self.write_log(msg)
知识点:
(1).os.walk()方法
os.walk方法,主要用来遍历一个目录内各个子目录和子文件。
os.walk(top, topdown=True, onerror=None, followlinks=False)
可以得到一个三元tupple(dirpath, dirnames, filenames),
第一个为起始路径,第二个为起始路径下的文件夹,第三个是起始路径下的文件。
dirpath 是一个string,代表目录的路径,
dirnames 是一个list,包含了dirpath下所有子目录的名字。
filenames 是一个list,包含了非目录文件的名字。
这些名字不包含路径信息,如果需要得到全路径,需要使用os.path.join(dirpath, name).
(2).import importlib
一个函数运行需要根据不同项目的配置,动态导入对应的配置文件运行。
import importlib # 绝对导入 a = importlib.import_module("clazz.a") a.show() # show A # 相对导入 b = importlib.import_module(".b", "clazz") b.show() # show B #注意,相对导入有个一点., 类似路径
(3).dir()
返回模块的属性列表
class A(object): name = "xxx" bar = 1 def fu(sefl): pass a = A() for xxx in dir(a): print(xxx)
https://www.cnblogs.com/aademeng/articles/7259986.html
(4)getattr()
getattr() 函数用于返回一个对象属性值。最好的理解是通过示例:
>>>class A(object): ... bar = 1 ... >>> a = A() >>> getattr(a, 'bar') # 获取属性 bar 值 1
(5)isinstance
isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()
>>>a = 2 >>> isinstance (a,int) True
class A(): test = "xxx" bar = 1 def fu(sefl): pass a = A() for i in dir(a): print("i是{}".format(i)) value = getattr(a, i) print("value是{}".format(value)) print(isinstance(value, type)) print("*"*80)
3.load_strategy_setting
加载users/你的用户名/.vntrader/下面的策略,并且通过add_strategy实例化多个策略类。
def load_strategy_setting(self): """ Load setting file. """ self.strategy_setting = load_json(self.setting_filename) #加载cta_strategy_setting.json for strategy_name, strategy_config in self.strategy_setting.items(): #对字典进行遍历。 self.add_strategy( strategy_config["class_name"], #策略的类名 strategy_name, #你自己输入的具体策略名称,比如rb1910螺纹,因为一个策略可以跑多个品种,多个周期。 strategy_config["vt_symbol"], strategy_config["setting"] #从下图可以看出,这里存的就是classname:oneMinuteStrategy )
通过load_json加载了cta_strategy_setting.json文件,并执行了add_strategy。
备注:cta_strategy_setting.json这个文件是在安装了vnpy之后,在users/你的用户名/.vntrader/下面生成的。
它和你打开vnpy的UI界面,然后执行“功能-CTA策略”打开的窗口看到的已添加策略是对应的。比如在这个界面将rb1910螺纹删除,那么cta_strategy_setting.json这个文件就会变成一个空文件。
同时生成的还有cta_strtegy_data.json文件。
再看看add_strategy
它做的事情就是:添加一个实例化的类,并将这个类放到self.strategies这个字典里面,最后建立合约与实例化的策略类的映射,更新cta_strategy_setting.json文件,再将event put出去。
它要传入的参数是这样的:add_strategy("oneMinuteStrategy","rb1910螺纹","rb1910.SHFE",{"class_name":"oneMinuteStrategy"})
def add_strategy( self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict ): """ Add a new strategy. """ if strategy_name in self.strategies: #如果策略名已经在self.strategies字典中了。见下面的示例。 self.write_log(f"创建策略失败,存在重名{strategy_name}") return strategy_class = self.classes.get(class_name, None) #通过get从self.classes字典找到相应的策略类。 if not strategy_class: self.write_log(f"创建策略失败,找不到策略类{class_name}") return #实例化一个策略类,查看策略类源码,可以发现需要传入的参数为def __init__(self, cta_engine, strategy_name, vt_symbol, setting),下面的self就表示前当的引擎CtaEngine。 strategy = strategy_class(self, strategy_name, vt_symbol, setting) self.strategies[strategy_name] = strategy #将实例化后的类存到self.strategies这个字典里面 # Add vt_symbol to strategy map. 将rb1910添加到合约与策略的映射。 strategies = self.symbol_strategy_map[vt_symbol] #symbol_strategy_map是一个symbol、strategy映射字典 strategies.append(strategy) # Update to setting file. self.update_strategy_setting(strategy_name, setting) 更新cta_strategy_setting.json文件 self.put_strategy_event(strategy)
比如判断某值是否在字典中:
aa = {'a':"ccc",'b':"cc",'c':"333"} b = "a" print(aa[b]) if "a" in aa: print("a在字典中") from collections import defaultdict strategy="teststrategy" vt_symbol="rb1910.SHFE" symbol_strategy_map = defaultdict(list) strategies = symbol_strategy_map[vt_symbol] print(strategies) print(symbol_strategy_map) strategies.append(strategy) print(strategies) print(symbol_strategy_map)
再看update_strategy_setting这个函数,它的作用就是将json数据存到setting_filename(即cta_strategy_setting.json)文件。
def update_strategy_setting(self, strategy_name: str, setting: dict): """ Update setting file. """ strategy = self.strategies[strategy_name] self.strategy_setting[strategy_name] = { "class_name": strategy.__class__.__name__, "vt_symbol": strategy.vt_symbol, "setting": setting, } save_json(self.setting_filename, self.strategy_setting)
再看put_strategy_event函数。这个顾名思议,就是将event put出去。
def put_strategy_event(self, strategy: CtaTemplate): """ Put an event to update strategy status. """ data = strategy.get_data() #就是获得策略的相关信息 event = Event(EVENT_CTA_STRATEGY, data) #将策略的data传到Event事件引擎 self.event_engine.put(event)
(1).get_data
这个来自doubleMA之类策略的父类CtaTemplate,就是获得策略的相关信息
def get_data(self): """ Get strategy data. """ strategy_data = { "strategy_name": self.strategy_name, "vt_symbol": self.vt_symbol, "class_name": self.__class__.__name__, "author": self.author, "parameters": self.get_parameters(), "variables": self.get_variables(), } return strategy_data
(2).这里有一个EVENT_CTA_STRATEGY,又是从base.py里面导入进来的,我们看它的定义,
EVENT_CTA_LOG = "eCtaLog" EVENT_CTA_STRATEGY = "eCtaStrategy" EVENT_CTA_STOPORDER = "eCtaStopOrder"
可以知道,它就是一个事件。
4.load_strategy_data函数
这个函数的作用就是从cta_strategy_data.json中取出数据。因为根据之前的定义,data_filename = "cta_strategy_data.json"。
取到的数据在init_all_strategies的时候需要用到。
def load_strategy_data(self): """ Load strategy data from json file. """ self.strategy_data = load_json(self.data_filename)
我们看看这个json文件长什么样:
5.register_event
注册事件引擎
def register_event(self): """""" self.event_engine.register(EVENT_TICK, self.process_tick_event) self.event_engine.register(EVENT_ORDER, self.process_order_event) self.event_engine.register(EVENT_TRADE, self.process_trade_event) self.event_engine.register(EVENT_POSITION, self.process_position_event)
二、init_all_strategies()
最终的结果是订阅了symbol的行情,并执行了strategy.on_init(这个on_init是在策略里面定义的,其功能就是self.load_bar(10))。
def init_all_strategies(self): """ """ for strategy_name in self.strategies.keys(): self.init_strategy(strategy_name) def init_strategy(self, strategy_name: str): """ Init a strategy. """ self.init_queue.put(strategy_name) #将strategy_name put进去 if not self.init_thread: self.init_thread = Thread(target=self._init_strategy) self.init_thread.start()
这个init_queue是在这个类的init中定义的:self.init_queue = Queue()。
init_thread也是在init中定义的:self.init_thread = None
小知识:
queue.empty:exception Queue.Empty 的异常。
再看一下这个_init_strategy
def _init_strategy(self): """ Init strategies in queue. """ while not self.init_queue.empty(): strategy_name = self.init_queue.get() strategy = self.strategies[strategy_name] if strategy.inited: self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作") continue self.write_log(f"{strategy_name}开始执行初始化") # Call on_init function of strategy self.call_strategy_func(strategy, strategy.on_init) # Restore strategy data(variables) data = self.strategy_data.get(strategy_name, None) if data: for name in strategy.variables: value = data.get(name, None) if value: setattr(strategy, name, value) # Subscribe market data contract = self.main_engine.get_contract(strategy.vt_symbol) #获得symbol if contract: req = SubscribeRequest( symbol=contract.symbol, exchange=contract.exchange) #这里是拼接代码 self.main_engine.subscribe(req, contract.gateway_name) #调用主引擎的subscribe,而主引擎的subsribe又是调用Gateway的subscribe.可以看下面的流程图: else: self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy) # Put event to update init completed status. strategy.inited = True self.put_strategy_event(strategy) self.write_log(f"{strategy_name}初始化完成") self.init_thread = None
这里的put_strategy_event函数在上面已经有了。
这里又有一个call_strategy_func,它的解释是“Call function of a strategy and catch any exception raised.”执行策略中的函数,不过需要传入策略,策略的函数两个参数。其实策略也可以不传,但是就无法让后面的strategy.trading = False了。
它的用法是这样的:self.call_strategy_func(strategy, strategy.on_init),其实就是执行strategy.on_init。
def call_strategy_func( self, strategy: CtaTemplate, func: Callable, params: Any = None ): """ Call function of a strategy and catch any exception raised. """ try: if params: func(params) else: func() except Exception: strategy.trading = False strategy.inited = False msg = f"触发异常已停止\n{traceback.format_exc()}" self.write_log(msg, strategy)
三、start_all_strategies()
这个函数非常短,就是遍历strategies.keys,然后依次执行start_strategy。最终的结果就是执行strategy.on_start(其实on_start啥都没做,就是输出“策略启动”,再就是执行self.put_event())。
我们来看看start_strategy,它的功能很简单,其实就是执行策略中的on_start函数。
def start_strategy(self, strategy_name: str): """ Start a strategy. """ strategy = self.strategies[strategy_name] #从self.strategies字典中取出键名为“XX”的策略实例 因为add_strategy那里添加的是实例 。 if not strategy.inited: self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化") return if strategy.trading: self.write_log(f"{strategy_name}已经启动,请勿重复操作") return self.call_strategy_func(strategy, strategy.on_start) strategy.trading = True self.put_strategy_event(strategy)
四、其他函数
cta_engine里面有一个check_stop_order函数。
def check_stop_order(self, tick: TickData): """ 这个函数就是执行strategy的on_stop_order函数。 """ for stop_order in list(self.stop_orders.values()): if stop_order.vt_symbol != tick.vt_symbol: continue long_triggered = ( stop_order.direction == Direction.LONG and tick.last_price >= stop_order.price ) short_triggered = ( stop_order.direction == Direction.SHORT and tick.last_price <= stop_order.price ) if long_triggered or short_triggered: strategy = self.strategies[stop_order.strategy_name] # To get excuted immediately after stop order is # triggered, use limit price if available, otherwise # use ask_price_5 or bid_price_5 if stop_order.direction == Direction.LONG: if tick.limit_up: price = tick.limit_up else: price = tick.ask_price_5 else: if tick.limit_down: price = tick.limit_down else: price = tick.bid_price_5 contract = self.main_engine.get_contract(stop_order.vt_symbol) vt_orderids = self.send_limit_order( strategy, contract, stop_order.direction, stop_order.offset, price, stop_order.volume, stop_order.lock ) # Update stop order status if placed successfully if vt_orderids: # Remove from relation map. self.stop_orders.pop(stop_order.stop_orderid) strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name] if stop_order.stop_orderid in strategy_vt_orderids: strategy_vt_orderids.remove(stop_order.stop_orderid) # Change stop order status to cancelled and update to strategy. stop_order.status = StopOrderStatus.TRIGGERED stop_order.vt_orderids = vt_orderids self.call_strategy_func( strategy, strategy.on_stop_order, stop_order ) self.put_stop_order_event(stop_order)
那么这个函数在哪儿被调用呢?
def process_tick_event(self, event: Event): """""" tick = event.data strategies = self.symbol_strategy_map[tick.vt_symbol] if not strategies: return self.check_stop_order(tick) for strategy in strategies: if strategy.inited: self.call_strategy_func(strategy, strategy.on_tick, tick)
这里面的symbol_strategy_map是在CtaEngine的init里面定义的:
self.symbol_strategy_map = defaultdict(list) # vt_symbol: strategy list
再来看看put_stop_order_event函数,它传入的是StopOrder。
def put_stop_order_event(self, stop_order: StopOrder): """ Put an event to update stop order status. """ event = Event(EVENT_CTA_STOPORDER, stop_order) self.event_engine.put(event)
最后附上别人画的一张流程图:
process_tick根据symbol分发给相应的策略:
还有这里有一篇文章写得很好,可以参考:
https://www.vnpy.com/forum/topic/1064-ctace-lue-mo-ni-jiao-yi-xue-xi-ji-lu-yi
六、onTrade,onOrder
在单一合约里,onTrade,onOrder是一样原。
在套利的时候,买卖一个跨期的合约,spM1705&m1709,多m1705,空m1709
onOrder会收到一条信息
onTrade会收到两条信息。