蜗牛博客VNPY源码学习系列文章
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
提示:
回测时最好使用脚本,使用UI界面回测经常出错了啥提示也没有,让你抓狂。
首先看看回测引擎的代码:
from vnpy.app.cta_strategy.backtesting import BacktestingEngine from vnpy.app.cta_strategy.strategies.boll_channel_strategy import BollChannelStrategy from datetime import datetime engine = BacktestingEngine() engine.set_parameters( vt_symbol = "000001.SZSE", interval ="d", start = datetime(2018,3,23), end = datetime(2018,4,23), rate = 0, slippage = 0, size = 300, pricetick = 0.2, capital = 1_000_000, ) engine.add_strategy(BollChannelStrategy,{}) engine.load_data() engine.run_backtesting() df = engine.calculate_result() engine.calculate_statistics() engine.show_chart()
一、首先看一下set_parameters函数。
首先要明确,这是BacktestingEngine,是在vnpy/app/cta_strategy/backtesting.py这里面定义的,在vnpy/app/cta_backtester/engine.py里面,有一个BacktesterEngine,很容易搞混。
这个函数没啥特别的地方,就是赋值。
二、add_strategy
传入参数,实例化一个策略,相当于执行了DoubleMaStrategy(strategy_name,vt_symbol, setting)
def add_strategy(self, strategy_class: type, setting: dict): """""" self.strategy_class = strategy_class self.strategy = strategy_class( self, strategy_class.__name__, self.vt_symbol, setting )
三、loda_data
最终的结果是通过数据库的ORM取出DbBarData,遍历DbBarData,通过to_tick或to_bar方法生成tick或Bar,最终得到self.history_data(里面保存tick或bar)。
def load_data(self): """""" self.output("开始加载历史数据") if not self.end: self.end = datetime.now() if self.start >= self.end: self.output("起始日期必须小于结束日期") return self.history_data.clear() # Clear previously loaded history data # Load 30 days of data each time and allow for progress update progress_delta = timedelta(days=30) total_delta = self.end - self.start start = self.start end = self.start + progress_delta progress = 0 while start < self.end: end = min(end, self.end) # Make sure end time stays within set range if self.mode == BacktestingMode.BAR: data = load_bar_data( self.symbol, self.exchange, self.interval, start, end ) else: data = load_tick_data( self.symbol, self.exchange, start, end ) self.history_data.extend(data) progress += progress_delta / total_delta progress = min(progress, 1) progress_bar = "#" * int(progress * 10) self.output(f"加载进度:{progress_bar} [{progress:.0%}]") start = end end += progress_delta self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")
1.load_bar_data
@lru_cache(maxsize=999) def load_bar_data( symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime ): """""" return database_manager.load_bar_data( symbol, exchange, interval, start, end )
它其实是调用database_manager的load_bar_data方法。通过查看头部可以发现from vnpy.trader.database import database_manager,可是在vnpy.trader.database下面没有找到这个database_manager,只在init.py找到下面这句:database_manager: "BaseDatabaseManager" = init(settings=settings),据官方的说法:
这里的database_manager,是在database内部代码中定义的,并会基于GlobalSetting中的数据库配置自行创建配置不同的对象,inti函数就是返回这个对象
然后在vnpy\trader\database\database_mongo.py里面找到下在的语句。
class MongoManager(BaseDatabaseManager): def load_bar_data( self, symbol: str, exchange: Exchange, interval: Interval, start: datetime, end: datetime, ) -> Sequence[BarData]: s = DbBarData.objects( symbol=symbol, exchange=exchange.value, interval=interval.value, datetime__gte=start, datetime__lte=end, ) data = [db_bar.to_bar() for db_bar in s] return data
这里的.objects是MangoDB的ORM语法。
2.to.bar函数:
def to_bar(self): """ Generate BarData object from DbBarData. """ bar = BarData( symbol=self.symbol, exchange=Exchange(self.exchange), datetime=self.datetime, interval=Interval(self.interval), volume=self.volume, open_interest=self.open_interest, open_price=self.open_price, high_price=self.high_price, low_price=self.low_price, close_price=self.close_price, gateway_name="DB", ) return bar
再看看to_tick函数
def to_tick(self): """ Generate TickData object from DbTickData. """ tick = TickData( symbol=self.symbol, exchange=Exchange(self.exchange), datetime=self.datetime, name=self.name, volume=self.volume, open_interest=self.open_interest, last_price=self.last_price, last_volume=self.last_volume, limit_up=self.limit_up, limit_down=self.limit_down, open_price=self.open_price, high_price=self.high_price, low_price=self.low_price, pre_close=self.pre_close, bid_price_1=self.bid_price_1, ask_price_1=self.ask_price_1, bid_volume_1=self.bid_volume_1, ask_volume_1=self.ask_volume_1, gateway_name="DB", ) if self.bid_price_2: tick.bid_price_2 = self.bid_price_2 tick.bid_price_3 = self.bid_price_3 tick.bid_price_4 = self.bid_price_4 tick.bid_price_5 = self.bid_price_5 tick.ask_price_2 = self.ask_price_2 tick.ask_price_3 = self.ask_price_3 tick.ask_price_4 = self.ask_price_4 tick.ask_price_5 = self.ask_price_5 tick.bid_volume_2 = self.bid_volume_2 tick.bid_volume_3 = self.bid_volume_3 tick.bid_volume_4 = self.bid_volume_4 tick.bid_volume_5 = self.bid_volume_5 tick.ask_volume_2 = self.ask_volume_2 tick.ask_volume_3 = self.ask_volume_3 tick.ask_volume_4 = self.ask_volume_4 tick.ask_volume_5 = self.ask_volume_5 return tick
四、run_backtesting
这个函数的作是初始化策略,遍历之前的history_data,并撮合限价单,撮合停止单,再执行策略的on_bar函数。
def run_backtesting(self): """""" if self.mode == BacktestingMode.BAR: func = self.new_bar else: func = self.new_tick self.strategy.on_init() # Use the first [days] of history data for initializing strategy day_count = 0 ix = 0 for ix, data in enumerate(self.history_data): if self.datetime and data.datetime.day != self.datetime.day: day_count += 1 if day_count >= self.days: break self.datetime = data.datetime self.callback(data) self.strategy.inited = True self.output("策略初始化完成") self.strategy.on_start() self.strategy.trading = True self.output("开始回放历史数据") # Use the rest of history data for running backtesting for data in self.history_data[ix:]: func(data) self.output("历史数据回放结束")
1.history_data
这里的history_data是在执行engine.load_data()后得到的。前面有定义ix=0,所以history_data[ix:]就是所有的数据了。
2.new_bar函数
先撮合限价单,再撮合停止单,再执行策略的on_bar函数,进行策略的判断,最后更新每天的收盘价?
def new_bar(self, bar: BarData): """""" self.bar = bar self.datetime = bar.datetime #选对之前的订单进行撮合。 self.cross_limit_order() self.cross_stop_order() self.strategy.on_bar(bar) self.update_daily_close(bar.close_price)
3.撮合成交
def cross_limit_order(self): """ Cross limit order with last bar/tick data.这是限价单,即低于多少价买入 """ if self.mode == BacktestingMode.BAR: ##买入的条件是价格低于多少买入,撮合价当然按最低价来算。 long_cross_price = self.bar.low_price short_cross_price = self.bar.high_price ##有一种情况,比如设定价格低于98元就买入,下一根k线一开盘就成了95元,然后一路下跌,这时的成交价就是开盘价,不是最低价了。 long_best_price = self.bar.open_price short_best_price = self.bar.open_price else: long_cross_price = self.tick.ask_price_1 short_cross_price = self.tick.bid_price_1 long_best_price = long_cross_price short_best_price = short_cross_price for order in list(self.active_limit_orders.values()): # Push order update with status "not traded" (pending). ##这里出现了order,self.active_limit_orders = {}是一个字典,是类开头定义的 if order.status == Status.SUBMITTING: order.status = Status.NOTTRADED self.strategy.on_order(order) # Check whether limit orders can be filled. #限价单情况下,多单成交的条件判断:1.指令是多单;2.回测K线的最低价(long_cross_price)小于发出指令的价格;3.回测K线的最低价(long_cross_price)大于0. long_cross = ( order.direction == Direction.LONG and order.price >= long_cross_price and long_cross_price > 0 ) short_cross = ( order.direction == Direction.SHORT and order.price <= short_cross_price and short_cross_price > 0 ) if not long_cross and not short_cross: continue # Push order udpate with status "all traded" (filled). order.traded = order.volume order.status = Status.ALLTRADED self.strategy.on_order(order) self.active_limit_orders.pop(order.vt_orderid) # Push trade update #生成订单信息 self.trade_count += 1 if long_cross: #订单的成交价格 trade_price = min(order.price, long_best_price) pos_change = order.volume else: trade_price = max(order.price, short_best_price) pos_change = -order.volume trade = TradeData( symbol=order.symbol, exchange=order.exchange, orderid=order.orderid, tradeid=str(self.trade_count), direction=order.direction, offset=order.offset, price=trade_price, volume=order.volume, time=self.datetime.strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) trade.datetime = self.datetime self.strategy.pos += pos_change self.strategy.on_trade(trade) self.trades[trade.vt_tradeid] = trade
五、buy、sell
策略中的on_bar函数中有buy, sell函数。通过查看CtaTemplate可知,buy\sell\short\cover四个函数都是调用send_order。
def buy(self, price: float, volume: float, stop: bool = False): """ Send buy order to open a long position. """ return self.send_order(Direction.LONG, Offset.OPEN, price, volume, stop)
那我们就来看看send_order
def send_order( self, direction: Direction, offset: Offset, price: float, volume: float, stop: bool = False, lock: bool = False ): """ Send a new order. """ if self.trading: vt_orderids = self.cta_engine.send_order( self, direction, offset, price, volume, stop, lock ) return vt_orderids else: return []
看后查看发现self.cta_engine = cta_engine,再看它的源码
def send_order( self, strategy: CtaTemplate, direction: Direction, offset: Offset, price: float, volume: float, stop: bool, lock: bool ): """ """ contract = self.main_engine.get_contract(strategy.vt_symbol) if not contract: self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy) return "" if stop: if contract.stop_supported: return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock) else: return self.send_local_stop_order(strategy, direction, offset, price, volume, lock) else: return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock)
再看send_limit_order、send_server_stop_order,发现它是调用send_server_order函数。
def send_server_order( self, strategy: CtaTemplate, contract: ContractData, direction: Direction, offset: Offset, price: float, volume: float, type: OrderType, lock: bool ): """ Send a new order to server. """ # Create request and send order. original_req = OrderRequest( symbol=contract.symbol, exchange=contract.exchange, direction=direction, offset=offset, type=type, price=price, volume=volume, ) # Convert with offset converter req_list = self.offset_converter.convert_order_request(original_req, lock) # Send Orders vt_orderids = [] for req in req_list: vt_orderid = self.main_engine.send_order( req, contract.gateway_name) vt_orderids.append(vt_orderid) self.offset_converter.update_order_request(req, vt_orderid) # Save relationship between orderid and strategy. self.orderid_strategy_map[vt_orderid] = strategy self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid) return vt_orderids
main.engine中的send_order。
def send_order(self, req: OrderRequest, gateway_name: str): """ Send new order request to a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.send_order(req) else: return ""
即下面这张图的流程:
那么回测的时候也是执行策略中的on_bar函数,那么sell,buy在哪里区分出来是回测还是实盘的?应该BacktestingEngine里面这个self就代表回测引擎。
def add_strategy(self, strategy_class: type, setting: dict): """""" self.strategy_class = strategy_class self.strategy = strategy_class( self, strategy_class.__name__, self.vt_symbol, setting )
最后看一个函数send_local_stop_order。
def send_local_stop_order( self, strategy: CtaTemplate, direction: Direction, offset: Offset, price: float, volume: float, lock: bool ): """ Create a new local stop order. """ self.stop_order_count += 1 stop_orderid = f"{STOPORDER_PREFIX}.{self.stop_order_count}" stop_order = StopOrder( vt_symbol=strategy.vt_symbol, direction=direction, offset=offset, price=price, volume=volume, stop_orderid=stop_orderid, strategy_name=strategy.strategy_name, lock=lock ) self.stop_orders[stop_orderid] = stop_order vt_orderids = self.strategy_orderid_map[strategy.strategy_name] vt_orderids.add(stop_orderid) self.call_strategy_func(strategy, strategy.on_stop_order, stop_order) self.put_stop_order_event(stop_order) return stop_orderid
六 、calculate_result
这个函数的功能是实现逐日盯市盈亏计算,返回一个名叫daily_df的DataFrame。
def calculate_result(self): """""" self.output("开始计算逐日盯市盈亏") if not self.trades: self.output("成交记录为空,无法计算") return # Add trade data into daily reuslt. for trade in self.trades.values(): d = trade.datetime.date() daily_result = self.daily_results[d] daily_result.add_trade(trade) # Calculate daily result by iteration. pre_close = 0 start_pos = 0 for daily_result in self.daily_results.values(): daily_result.calculate_pnl( pre_close, start_pos, self.size, self.rate, self.slippage ) pre_close = daily_result.close_price start_pos = daily_result.end_pos # Generate dataframe results = defaultdict(list) for daily_result in self.daily_results.values(): for key, value in daily_result.__dict__.items(): results[key].append(value) self.daily_df = DataFrame.from_dict(results).set_index("date") self.output("逐日盯市盈亏计算完成") return self.daily_df
1.self.trades
这里出现了self.trades,它是一个字典,我们发现它是在cross_limit_order、cross_stop_order这两个函数里面有赋值。因为这两个函数比较复杂难懂,关于这两个函数,打算下一章专门探索。
2.add_trade
这里出现了一个add_trade函数,非常简单,差不多就是一个append的封装。
def add_trade(self, trade: TradeData): """""" self.trades.append(trade)
3.calculate_pnl
def calculate_pnl( self, pre_close: float, start_pos: float, size: int, rate: float, slippage: float, ): """""" self.pre_close = pre_close # Holding pnl is the pnl from holding position at day start self.start_pos = start_pos self.end_pos = start_pos self.holding_pnl = self.start_pos * \ (self.close_price - self.pre_close) * size # Trading pnl is the pnl from new trade during the day self.trade_count = len(self.trades) for trade in self.trades: if trade.direction == Direction.LONG: pos_change = trade.volume else: pos_change = -trade.volume turnover = trade.price * trade.volume * size self.trading_pnl += pos_change * \ (self.close_price - trade.price) * size self.end_pos += pos_change self.turnover += turnover self.commission += turnover * rate self.slippage += trade.volume * size * slippage # Net pnl takes account of commission and slippage cost self.total_pnl = self.trading_pnl + self.holding_pnl self.net_pnl = self.total_pnl - self.commission - self.slippage
七、calculate_statistics
这个函数的功能是计算策略统计指标,返回一个统计指标的字典:
def calculate_statistics(self, df: DataFrame = None, output=True): """""" self.output("开始计算策略统计指标") # Check DataFrame input exterior if df is None: df = self.daily_df # Check for init DataFrame if df is None: # Set all statistics to 0 if no trade. start_date = "" end_date = "" total_days = 0 ...... return_drawdown_ratio = 0 else: # Calculate balance related time series data df["balance"] = df["net_pnl"].cumsum() + self.capital df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0) df["highlevel"] = ( df["balance"].rolling( min_periods=1, window=len(df), center=False).max() ) df["drawdown"] = df["balance"] - df["highlevel"] df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100 # Calculate statistics value start_date = df.index[0] end_date = df.index[-1] total_days = len(df) profit_days = len(df[df["net_pnl"] > 0]) loss_days = len(df[df["net_pnl"] < 0]) end_balance = df["balance"].iloc[-1] max_drawdown = df["drawdown"].min() max_ddpercent = df["ddpercent"].min() total_net_pnl = df["net_pnl"].sum() daily_net_pnl = total_net_pnl / total_days total_commission = df["commission"].sum() daily_commission = total_commission / total_days total_slippage = df["slippage"].sum() daily_slippage = total_slippage / total_days total_turnover = df["turnover"].sum() daily_turnover = total_turnover / total_days total_trade_count = df["trade_count"].sum() daily_trade_count = total_trade_count / total_days total_return = (end_balance / self.capital - 1) * 100 annual_return = total_return / total_days * 240 daily_return = df["return"].mean() * 100 return_std = df["return"].std() * 100 if return_std: sharpe_ratio = daily_return / return_std * np.sqrt(240) else: sharpe_ratio = 0 return_drawdown_ratio = -total_return / max_ddpercent # Output if output: self.output("-" * 30) self.output(f"首个交易日:\t{start_date}") self.output(f"最后交易日:\t{end_date}") self.output(f"总交易日:\t{total_days}") self.output(f"盈利交易日:\t{profit_days}") self.output(f"亏损交易日:\t{loss_days}") self.output(f"起始资金:\t{self.capital:,.2f}") self.output(f"结束资金:\t{end_balance:,.2f}") self.output(f"总收益率:\t{total_return:,.2f}%") self.output(f"年化收益:\t{annual_return:,.2f}%") self.output(f"最大回撤: \t{max_drawdown:,.2f}") self.output(f"百分比最大回撤: {max_ddpercent:,.2f}%") self.output(f"总盈亏:\t{total_net_pnl:,.2f}") self.output(f"总手续费:\t{total_commission:,.2f}") self.output(f"总滑点:\t{total_slippage:,.2f}") self.output(f"总成交金额:\t{total_turnover:,.2f}") self.output(f"总成交笔数:\t{total_trade_count}") self.output(f"日均盈亏:\t{daily_net_pnl:,.2f}") self.output(f"日均手续费:\t{daily_commission:,.2f}") self.output(f"日均滑点:\t{daily_slippage:,.2f}") self.output(f"日均成交金额:\t{daily_turnover:,.2f}") self.output(f"日均成交笔数:\t{daily_trade_count}") self.output(f"日均收益率:\t{daily_return:,.2f}%") self.output(f"收益标准差:\t{return_std:,.2f}%") self.output(f"Sharpe Ratio:\t{sharpe_ratio:,.2f}") self.output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}") statistics = { "start_date": start_date, "end_date": end_date, "total_days": total_days, "profit_days": profit_days, "loss_days": loss_days, "capital": self.capital, "end_balance": end_balance, "max_drawdown": max_drawdown, "max_ddpercent": max_ddpercent, "total_net_pnl": total_net_pnl, "daily_net_pnl": daily_net_pnl, "total_commission": total_commission, "daily_commission": daily_commission, "total_slippage": total_slippage, "daily_slippage": daily_slippage, "total_turnover": total_turnover, "daily_turnover": daily_turnover, "total_trade_count": total_trade_count, "daily_trade_count": daily_trade_count, "total_return": total_return, "annual_return": annual_return, "daily_return": daily_return, "return_std": return_std, "sharpe_ratio": sharpe_ratio, "return_drawdown_ratio": return_drawdown_ratio, } return statistics
八、show_chart
最后是显示图表的函数。
def show_chart(self, df: DataFrame = None): """""" # Check DataFrame input exterior if df is None: df = self.daily_df # Check for init DataFrame if df is None: return plt.figure(figsize=(10, 16)) balance_plot = plt.subplot(4, 1, 1) #表示4行*1列,第一个图 balance_plot.set_title("Balance") df["balance"].plot(legend=True) drawdown_plot = plt.subplot(4, 1, 2) drawdown_plot.set_title("Drawdown") drawdown_plot.fill_between(range(len(df)), df["drawdown"].values) pnl_plot = plt.subplot(4, 1, 3) pnl_plot.set_title("Daily Pnl") df["net_pnl"].plot(kind="bar", legend=False, grid=False, xticks=[]) distribution_plot = plt.subplot(4, 1, 4) distribution_plot.set_title("Daily Pnl Distribution") df["net_pnl"].hist(bins=50) plt.show()