一、简介
本项目是对论文《Stock Trading with Recurrent Reinforcement Learning (RRL)》的代码重现。
论文URL:http://cs229.stanford.edu/proj2006/Molina-StockTradingWithRecurrentReinforcementLearning.pdf
二、结论
先看结论吧:
这种方法的主要困难在于某些股票事件没有显示结构。从上面的第二个示例中可以看出,强化学习者无法预测股价的急剧下跌,并且像人类一样脆弱。如果与预测这种急剧下降的机制结合使用,可能会更有效。该模型还可以在其他方面作出修改,比如将成交量包含进来,作为预测涨跌的特征。
另外,可以尝试使用固定交易成本以及降低交易频率。例如,可以创建一个模型,该模型从长时间的数据中学习,但只能定期做出决策。这将反映一个散户交易者以固定交易成本参与较小数量交易的情况。因为对于散户而言,以固定的交易成本在每个期间进行交易都太昂贵了,所以具有定期交易策略的模型对于此类用户更为可行。
三、小知识
1.Sharp夏普比率的计算方法
四、代码
# -*- coding: utf-8 -*- import time import pickle import numpy as np import pandas as pd from datetime import datetime as dt import matplotlib.pyplot as plt def main(): fname = "../data/USDJPY30.csv" init_t = 6000 T = 1000 M = 200 mu = 10000 sigma = 0.04 rho = 1.0 n_epoch = 10000 # RRL agent with initial weight. ini_rrl = TradingRRL(T, M, init_t, mu, sigma, rho, n_epoch) ini_rrl.load_csv(fname) ini_rrl.set_t_p_r() ini_rrl.calc_dSdw() # RRL agent for training rrl = TradingRRL(T, M, init_t, mu, sigma, rho, n_epoch) rrl.all_t = ini_rrl.all_t rrl.all_p = ini_rrl.all_p rrl.set_t_p_r() rrl.fit() # Plot results. # Training for initial term T. plt.plot(range(len(rrl.epoch_S)),rrl.epoch_S) plt.title("Sharp's ratio optimization") plt.xlabel("Epoch times") plt.ylabel("Sharp's ratio") plt.grid(True) plt.savefig("sharp's ratio optimization.png", dpi=300) plt.close fig, ax = plt.subplots(nrows=3, figsize=(15, 10)) t = np.linspace(1, rrl.T, rrl.T)[::-1] ax[0].plot(t, rrl.p[:rrl.T]) ax[0].set_xlabel("time") ax[0].set_ylabel("USDJPY") ax[0].grid(True) ax[1].plot(t, ini_rrl.F[:rrl.T], color="blue", label="With initial weights") ax[1].plot(t, rrl.F[:rrl.T], color="red", label="With optimized weights") ax[1].set_xlabel("time") ax[1].set_ylabel("F") ax[1].legend(loc="upper left") ax[1].grid(True) ax[2].plot(t, ini_rrl.sumR, color="blue", label="With initial weights") ax[2].plot(t, rrl.sumR, color="red", label="With optimized weights") ax[2].set_xlabel("time") ax[2].set_ylabel("Sum of reward[yen]") ax[2].legend(loc="upper left") ax[2].grid(True) plt.savefig("rrl_train.png", dpi=300) fig.clear() # Prediction for next term T with optimized weight. # RRL agent with initial weight. ini_rrl_f = TradingRRL(T, M, init_t-T, mu, sigma, rho, n_epoch) ini_rrl_f.all_t = ini_rrl.all_t ini_rrl_f.all_p = ini_rrl.all_p ini_rrl_f.set_t_p_r() ini_rrl_f.calc_dSdw() # RRL agent with optimized weight. rrl_f = TradingRRL(T, M, init_t-T, mu, sigma, rho, n_epoch) rrl_f.all_t = ini_rrl.all_t rrl_f.all_p = ini_rrl.all_p rrl_f.set_t_p_r() rrl_f.w = rrl.w rrl_f.calc_dSdw() fig, ax = plt.subplots(nrows=3, figsize=(15, 10)) t_f = np.linspace(rrl.T+1, rrl.T+rrl.T, rrl.T)[::-1] ax[0].plot(t_f, rrl_f.p[:rrl_f.T]) ax[0].set_xlabel("time") ax[0].set_ylabel("USDJPY") ax[0].grid(True) ax[1].plot(t_f, ini_rrl_f.F[:rrl_f.T], color="blue", label="With initial weights") ax[1].plot(t_f, rrl_f.F[:rrl_f.T], color="red", label="With optimized weights") ax[1].set_xlabel("time") ax[1].set_ylabel("F") ax[1].legend(loc="lower right") ax[1].grid(True) ax[2].plot(t_f, ini_rrl_f.sumR, color="blue", label="With initial weights") ax[2].plot(t_f, rrl_f.sumR, color="red", label="With optimized weights") ax[2].set_xlabel("time") ax[2].set_ylabel("Sum of reward[yen]") ax[2].legend(loc="lower right") ax[2].grid(True) plt.savefig("rrl_prediction.png", dpi=300) fig.clear() class TradingRRL(object): def __init__(self, T=1000, M=200, init_t=10000, mu=10000, sigma=0.04, rho=1.0, n_epoch=10000): self.T = T self.M = M self.init_t = init_t self.mu = mu self.sigma = sigma self.rho = rho self.all_t = None self.all_p = None self.t = None self.p = None self.r = None self.x = np.zeros([T, M+2]) self.F = np.zeros(T+1) self.R = np.zeros(T) self.w = np.ones(M+2) self.w_opt = np.ones(M+2) self.epoch_S = np.empty(0) self.n_epoch = n_epoch self.progress_period = 100 self.q_threshold = 0.7 def load_csv(self, fname): tmp = pd.read_csv(fname, header=None) tmp_tstr = tmp[0] +" " + tmp[1] tmp_t = [dt.strptime(tmp_tstr[i], '%Y.%m.%d %H:%M') for i in range(len(tmp_tstr))] tmp_p = list(tmp[5]) self.all_t = np.array(tmp_t[::-1]) self.all_p = np.array(tmp_p[::-1]) def quant(self, f): fc = f.copy() fc[np.where(np.abs(fc) < self.q_threshold)] = 0 return np.sign(fc) def set_t_p_r(self): self.t = self.all_t[self.init_t:self.init_t+self.T+self.M+1] self.p = self.all_p[self.init_t:self.init_t+self.T+self.M+1] self.r = -np.diff(self.p) def set_x_F(self): for i in range(self.T-1, -1 ,-1): self.x[i] = np.zeros(self.M+2) self.x[i][0] = 1.0 self.x[i][self.M+2-1] = self.F[i+1] for j in range(1, self.M+2-1, 1): self.x[i][j] = self.r[i+j-1] self.F[i] = np.tanh(np.dot(self.w, self.x[i])) def calc_R(self): self.R = self.mu * (self.F[1:] * self.r[:self.T] - self.sigma * np.abs(-np.diff(self.F))) def calc_sumR(self): self.sumR = np.cumsum(self.R[::-1])[::-1] self.sumR2 = np.cumsum((self.R**2)[::-1])[::-1] def calc_dSdw(self): self.set_x_F() self.calc_R() self.calc_sumR() self.A = self.sumR[0] / self.T self.B = self.sumR2[0] / self.T self.S = self.A / np.sqrt(self.B - self.A**2) self.dSdA = self.S * (1 + self.S**2) / self.A self.dSdB = -self.S**3 / 2 / self.A**2 self.dAdR = 1.0 / self.T self.dBdR = 2.0 / self.T * self.R self.dRdF = -self.mu * self.sigma * np.sign(-np.diff(self.F)) self.dRdFp = self.mu * self.r[:self.T] + self.mu * self.sigma * np.sign(-np.diff(self.F)) self.dFdw = np.zeros(self.M+2) self.dFpdw = np.zeros(self.M+2) self.dSdw = np.zeros(self.M+2) for i in range(self.T-1, -1 ,-1): if i != self.T-1: self.dFpdw = self.dFdw.copy() self.dFdw = (1 - self.F[i]**2) * (self.x[i] + self.w[self.M+2-1] * self.dFpdw) self.dSdw += (self.dSdA * self.dAdR + self.dSdB * self.dBdR[i]) * (self.dRdF[i] * self.dFdw + self.dRdFp[i] * self.dFpdw) def update_w(self): self.w += self.rho * self.dSdw def fit(self): pre_epoch_times = len(self.epoch_S) self.calc_dSdw() print("Epoch loop start. Initial sharp's ratio is " + str(self.S) + ".") self.S_opt = self.S tic = time.clock() for e_index in range(self.n_epoch): self.calc_dSdw() if self.S > self.S_opt: self.S_opt = self.S self.w_opt = self.w.copy() self.epoch_S = np.append(self.epoch_S, self.S) self.update_w() if e_index % self.progress_period == self.progress_period-1: toc = time.clock() print("Epoch: " + str(e_index + pre_epoch_times + 1) + "/" + str(self.n_epoch + pre_epoch_times) +". Shape's ratio: " + str(self.S) + ". Elapsed time: " + str(toc-tic) + " sec.") toc = time.clock() print("Epoch: " + str(e_index + pre_epoch_times + 1) + "/" + str(self.n_epoch + pre_epoch_times) +". Shape's ratio: " + str(self.S) + ". Elapsed time: " + str(toc-tic) + " sec.") self.w = self.w_opt.copy() self.calc_dSdw() print("Epoch loop end. Optimized sharp's ratio is " + str(self.S_opt) + ".") def save_weight(self): pd.DataFrame(self.w).to_csv("w.csv", header=False, index=False) pd.DataFrame(self.epoch_S).to_csv("epoch_S.csv", header=False, index=False) def load_weight(self): tmp = pd.read_csv("w.csv", header=None) self.w = tmp.T.values[0] def plot_hist(n_tick, R): rnge = max(R) - min(R) tick = rnge / n_tick tick_min = [min(R) - tick * 0.5 + i * tick for i in range(n_tick)] tick_max = [min(R) + tick * 0.5 + i * tick for i in range(n_tick)] tick_center = [min(R) + i * tick for i in range(n_tick)] tick_val = [0.0] * n_tick for i in range(n_tick ): tick_val[i] = len(set(np.where(tick_min[i] < np.array(R))[0].tolist()).intersection(np.where(np.array(R) <= tick_max[i])[0])) plt.bar(tick_center, tick_val, width=tick) plt.grid() plt.show() if __name__ == "__main__": main()
运行截图:
可参考:
https://zhuanlan.zhihu.com/p/36632686
代码地址:https://github.com/darden1/tradingrrl