利用隐马尔可夫模型预测股票价格(附代码)
隐马尔可夫模型(HMM)是状态空间模型的特殊情况,其中潜在变量是离散和多项变量。从图形表示角度,我们可以将 HMM 看作一个双随机过程,包括一个你无法直接观察到的隐藏随机马尔可夫过程(潜在变量)和另一个随机过程,该过程在第一个过程中会产生一系列观察(observation)。
HMM 能够预测和分析基于时间的现象。因此,它们在语音识别,自然语言处理和金融市场预测等领域非常有用。在本文中,我们会讲解 HMM 在金融市场分析领域的应用,主要是股价预测。
由于许多大公司的明显兴趣,过去股市预测一直是较为活跃的研究领域之一。历史上,各种机器学习算法也被应用到这方面,并取得了不同程度的成功。然而,由于具有不稳定性、季节性和不可预测的特点,股价预测仍然受到大量因素的限制。仅仅根据以前的股价数据预测更是是一项更具挑战性的任务,因为没有考虑一些边缘因素。
HMM 能够根据有序的观察数据对隐藏状态转换进行建模。股票预测的问题也可以看作遵循相同的模式。股票的价格取决于众多因素,这些因素通常对投资者而言是隐形的(隐藏变量)。基础因素之间的转换会随着公司政策和决策,财务状况和管理决策而变化,这些因素都会影响股票的价格(观察数据)。因此,HMM 天然适合价格预测问题。
现在,我们可以使用 HMM 预测 Alphabet Inc.(GOOGL),Facebook(FB)和 Apple Inc.(AAPL)等公司的股价来测试一下。
收集股价数据
使用 Pystock 数据 获取历史股价数据。每天,在美国股票交易开篇之前,PyStock 会爬取股票价格和金融报告,并向 GitHub 仓库推送数据,比如某只股票前一天的开盘价、收盘价、最高价和最低价。该数据以天为单位,意思是没有以小时或分钟层面的数据。
下载一年的 Pystock 数据,因为数据集很大,我们写一个 Python 脚本来加载数据,并同时运行程序下载三个不同年份的数据:
"""
Usage: get_data.py --year=<year>
"""
import requests
import os
from docopt import docopt
# docopt(http://docopt.org/)能以简单的方式解析命令行参数
args = docopt(doc=__doc__, argv=None,
help=True, version=None,
options_first=False)
year = args['--year']
# 如果没有展示,则创建目录
year_directory_name = 'data/{year}'.format(year=year)
if not os.path.exists(year_directory_name):
os.makedirs(year_directory_name)
# 获取对应年份的文件列表
year_data_files = requests.get(
'http://data.pystock.com/{year}/index.txt'.format(year=year)
).text.strip().split('\n')
for data_file_name in year_data_files:
file_location = '{year_directory_name}/{data_file_name}'.format(
year_directory_name=year_directory_name,
data_file_name=data_file_name)
with open(file_location, 'wb+') as data_file:
print('>>> Downloading \t {file_location}'.format(file_location=file_location))
data_file_content = requests.get(
'http://data.pystock.com/{year}/{data_file_name}'.format(year=year, data_file_name=data_file_name)
).content
print('<<< Download Completed \t {file_location}'.format(file_location=file_location))
data_file.write(data_file_content)
运行下列脚本同时下载三年的数据:
python get_data.py --year 2015
python get_data.py --year 2016
python get_data.py --year 2017
下载数据后,通过组合对应所有年份的数据,获取前面所述每只股票的所有数据:
"""
Usage: parse_data.py --company=<company>
"""
import os
import tarfile
import pandas as pd
from pandas import errors as pd_errors
from functools import reduce
from docopt import docopt
args = docopt(doc=__doc__, argv=None,
help=True, version=None,
options_first=False)
years = [2015, 2016, 2017]
company = args['--company']
# 获取数据文件列表
data_files_list = []
for year in years:
year_directory = 'data/{year}'.format(year=year)
for file in os.listdir(year_directory):
data_files_list.append('{year_directory}/{file}'.format(year_directory=year_directory, file=file))
def parse_data(file_name, company_symbol):
"""
Returns data for the corresponding company
:param file_name: name of the tar file
:param company_symbol: company symbol
:type file_name: str
:type company_symbol: str
:return: dataframe for the corresponding company data
:rtype: pd.DataFrame
"""
tar = tarfile.open(file_name)
try:
price_report = pd.read_csv(tar.extractfile('prices.csv'))
company_price_data = price_report[price_report['symbol'] == company_symbol]
return company_price_data
except (KeyError, pd_errors.EmptyDataError):
return pd.DataFrame()
# 获得给定公司的完整数据
company_data = reduce(lambda df, file_name: df.append(parse_data(file_name, company)),
data_files_list,
pd.DataFrame())
company_data = company_data.sort_values(by=['date'])
# 若不存在,则为公司数据创建文件夹
if not os.path.exists('data/company_data'):
os.makedirs('data/company_data')
# 将数据写到CSV 文件中
company_data.to_csv('data/company_data/{company}.csv'.format(company=company),
columns=['date', 'open', 'high', 'low', 'close', 'volume', 'adj_close'],
index=False)
运行以下脚本,创建包含 GOOGL,FB 和 AAPL 股票的所有历史数据的.csv文件:
python parse_data.py --company GOOGL
python parse_data.py --company FB
python parse_data.py --company AAPL
股价预测的特征
我们获取的每天的数据特征非常有限,也就是当天股票的开盘价、收盘价、股票的最高价以及最低价。就用它们来计算股票价格。在给定当天的开盘价和之前几天的数据的情况下,我们可以计算一天的收盘价。预测模型会有d天的延迟时间。
现在,创建一个名为 StockPredictor 的预测模型,它将包含在给定日期内预测给定公司股票价格的所有逻辑。
我们不是直接使用股票的开盘价、收盘价、低价和高价,而是提取用于训练HMM的每只股票的分数变化。定义这些参数如下:
对于股票价格预测模型 HMM,可以将单个观察表示为这些参数的向量,即 Xt = <fracchange,frachigh,fraclow>:
import pandas as pd
class StockPredictor(object):
def __init__(self, company, n_latency_days=10):
self._init_logger()
self.company = company
self.n_latency_days = n_latency_days
self.data = pd.read_csv(
'data/company_data/{company}.csv'.format(company=self.company))
def _init_logger(self):
self._logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
self._logger.addHandler(handler)
self._logger.setLevel(logging.DEBUG)
@staticmethod
def _extract_features(data):
open_price = np.array(data['open'])
close_price = np.array(data['close'])
high_price = np.array(data['high'])
low_price = np.array(data['low'])
# 计算收盘价、高价和低价的分数变化
# which would be used a feature
frac_change = (close_price - open_price) / open_price
frac_high = (high_price - open_price) / open_price
frac_low = (open_price - low_price) / open_price
return np.column_stack((frac_change, frac_high, frac_low))
# 预测GOOGL股价
stock_predictor = StockPredictor(company='GOOGL')
使用 HMM 预测价格
预测价格的第一步是训练 HMM,用来从给定序列的观察中计算参数。由于观察是连续随机变量的向量,因此假设发射概率分布是连续的。为简单起见,假设它是具有参数(μ和Σ)的多项式高斯分布。因此,我们必须确定转移矩阵的以下参数,A,先验概率 π,以及 μ 和 Σ,它们代表多项式高斯分布。
现在,假设有四个隐藏状态。在接下来的部分中,我们需要研究找到最佳隐藏状态数的方法。使用 hmmlearn 包提供的 GaussianHMM 类作为 HMM,并使用它提供的 fit() 方法执行参数估计:
from hmmlearn.hmm import GaussianHMM
class StockPredictor(object):
def __init__(self, company, n_latency_days=10, n_hidden_states=4):
self._init_logger()
self.company = company
self.n_latency_days = n_latency_days
self.hmm = GaussianHMM(n_components=n_hidden_states)
self.data = pd.read_csv(
'data/company_data/{company}.csv'.format(company=self.company))
def fit(self):
self._logger.info('>>> Extracting Features')
feature_vector = StockPredictor._extract_features(self.data)
self._logger.info('Features extraction Completed <<<')
self.hmm.fit(feature_vector)
在机器学习中,我们将整个数据集划分为两个类别。第一个数据集是训练集,用来训练模型。第二个数据集是测试集,用来为模型提供无偏见的评估。将训练数据集从测试集中分离出来,能够防止出现过拟合。所以在这个案例中,我们将数据集分为两部分:train_data 用来训练模型,test_data 用来评估模型。要完成这一步,我们使用 sklearn.model_selection 模块提供的 train_test_split 方法:
from sklearn.model_selection import train_test_split
class StockPredictor(object):
def __init__(self, company, test_size=0.33,
n_latency_days=10, n_hidden_states=4):
self._init_logger()
self.company = company
self.n_latency_days = n_latency_days
self.hmm = GaussianHMM(n_components=n_hidden_states)
self._split_train_test_data(test_size)
def _split_train_test_data(self, test_size):
data = pd.read_csv(
'data/company_data/{company}.csv'.format(company=self.company))
_train_data, test_data = train_test_split(
data, test_size=test_size, shuffle=False)
self._train_data = _train_data
self._test_data = test_data
def fit(self):
self._logger.info('>>> Extracting Features')
feature_vector = StockPredictor._extract_features(self._train_data)
self._logger.info('Features extraction Completed <<<')
self.hmm.fit(feature_vector)
Train_test_split 能将数组或矩阵划分为随机训练子集和测试子集。由于我们可以用序列数据训练 HMM,随意不能随机划分数据。将 shuffle=False 作为参数传递,能够防止随机划分测试和训练数据。
等训练完模型后,我们需要预测股票的收盘价格。前面说过,我们想根据已知开盘价,预测股票某一天的收盘价。这意味着如果我们能预测某一天的 fracchange,就能计算出收盘价,如下所示:
这样以来,问题就变成了根据 t 天,x1,···,xt 的观察数据和 HMM 的参数
来计算一天的 Xt+1 = < fracchange, frachigh, fraclow > 观察向量,就是找到将 P(Xt+1|X1,…,Xt,θ) 的后验概率最大化的 Xt+1 的值:
一旦从最大化方程中删除所有与 Xt+1 的参数后,剩下的问题就是找到 Xt+1 的值,它会优化 P(X1,…,Xt+1|θ) 的概率。如果你假设 fracchange 是一个连续变量,那么这个问题的优化计算起来就非常难了。所以将这些分数变化划分为两个有限变量之间的离散值,找出一组能够将概率 P(X1,…,Xt+1|θ) 最大化的分数变化 < fracchange, frachigh, fraclow >:
所以,使用前面的离散值集,运行 (20 x 10 x 10 =) 2,000 次:
def _compute_all_possible_outcomes(self, n_steps_frac_change,
n_steps_frac_high, n_steps_frac_low):
frac_change_range = np.linspace(-0.1, 0.1, n_steps_frac_change)
frac_high_range = np.linspace(0, 0.1, n_steps_frac_high)
frac_low_range = np.linspace(0, 0.1, n_steps_frac_low)
self._possible_outcomes = np.array(list(itertools.product(
frac_change_range, frac_high_range, frac_low_range)))
现在,应用该方法来预测收盘价,如下所示:
def _get_most_probable_outcome(self, day_index):
previous_data_start_index = max(0, day_index - self.n_latency_days)
previous_data_end_index = max(0, day_index - 1)
previous_data = self._test_data.iloc[previous_data_end_index: previous_data_start_index]
previous_data_features = StockPredictor._extract_features(
previous_data)
outcome_score = []
for possible_outcome in self._possible_outcomes:
total_data = np.row_stack(
(previous_data_features, possible_outcome))
outcome_score.append(self.hmm.score(total_data))
most_probable_outcome = self._possible_outcomes[np.argmax(
outcome_score)]
return most_probable_outcome
def predict_close_price(self, day_index):
open_price = self._test_data.iloc[day_index]['open']
predicted_frac_change, _, _ = self._get_most_probable_outcome(
day_index)
return open_price * (1 + predicted_frac_change)
预测某些日期的收盘价,并绘制出价格曲线:
"""
Usage: analyse_data.py --company=<company>
"""
import warnings
import logging
import itertools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from hmmlearn.hmm import GaussianHMM
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from docopt import docopt
args = docopt(doc=__doc__, argv=None, help=True,
version=None, options_first=False)
# Supress warning in hmmlearn
warnings.filterwarnings("ignore")
# 将绘图风格改为ggplot
plt.style.use('ggplot')
class StockPredictor(object):
def __init__(self, company, test_size=0.33,
n_hidden_states=4, n_latency_days=10,
n_steps_frac_change=50, n_steps_frac_high=10,
n_steps_frac_low=10):
self._init_logger()
self.company = company
self.n_latency_days = n_latency_days
self.hmm = GaussianHMM(n_components=n_hidden_states)
self._split_train_test_data(test_size)
self._compute_all_possible_outcomes(
n_steps_frac_change, n_steps_frac_high, n_steps_frac_low)
def _init_logger(self):
self._logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
self._logger.addHandler(handler)
self._logger.setLevel(logging.DEBUG)
def _split_train_test_data(self, test_size):
data = pd.read_csv(
'data/company_data/{company}.csv'.format(company=self.company))
_train_data, test_data = train_test_split(
data, test_size=test_size, shuffle=False)
self._train_data = _train_data
self._test_data = test_data
@staticmethod
def _extract_features(data):
open_price = np.array(data['open'])
close_price = np.array(data['close'])
high_price = np.array(data['high'])
low_price = np.array(data['low'])
# 计算收盘价、高价和低价的分数变化
# 这会用到一个特征
frac_change = (close_price - open_price) / open_price
frac_high = (high_price - open_price) / open_price
frac_low = (open_price - low_price) / open_price
return np.column_stack((frac_change, frac_high, frac_low))
def fit(self):
self._logger.info('>>> Extracting Features')
feature_vector = StockPredictor._extract_features(self._train_data)
self._logger.info('Features extraction Completed <<<')
self.hmm.fit(feature_vector)
def _compute_all_possible_outcomes(self, n_steps_frac_change,
n_steps_frac_high, n_steps_frac_low):
frac_change_range = np.linspace(-0.1, 0.1, n_steps_frac_change)
frac_high_range = np.linspace(0, 0.1, n_steps_frac_high)
frac_low_range = np.linspace(0, 0.1, n_steps_frac_low)
self._possible_outcomes = np.array(list(itertools.product(
frac_change_range, frac_high_range, frac_low_range)))
def _get_most_probable_outcome(self, day_index):
previous_data_start_index = max(0, day_index - self.n_latency_days)
previous_data_end_index = max(0, day_index - 1)
previous_data = self._test_data.iloc[previous_data_end_index: previous_data_start_index]
previous_data_features = StockPredictor._extract_features(
previous_data)
outcome_score = []
for possible_outcome in self._possible_outcomes:
total_data = np.row_stack(
(previous_data_features, possible_outcome))
outcome_score.append(self.hmm.score(total_data))
most_probable_outcome = self._possible_outcomes[np.argmax(
outcome_score)]
return most_probable_outcome
def predict_close_price(self, day_index):
open_price = self._test_data.iloc[day_index]['open']
predicted_frac_change, _, _ = self._get_most_probable_outcome(
day_index)
return open_price * (1 + predicted_frac_change)
def predict_close_prices_for_days(self, days, with_plot=False):
predicted_close_prices = []
for day_index in tqdm(range(days)):
predicted_close_prices.append(self.predict_close_price(day_index))
if with_plot:
test_data = self._test_data[0: days]
days = np.array(test_data['date'], dtype="datetime64[ms]")
actual_close_prices = test_data['close']
fig = plt.figure()
axes = fig.add_subplot(111)
axes.plot(days, actual_close_prices, 'bo-', label="actual")
axes.plot(days, predicted_close_prices, 'r+-', label="predicted")
axes.set_title('{company}'.format(company=self.company))
fig.autofmt_xdate()
plt.legend()
plt.show()
return predicted_close_prices
stock_predictor = StockPredictor(company=args['--company'])
stock_predictor.fit()
stock_predictor.predict_close_prices_for_days(500, with_plot=True)
输出结果为:
结语
本文我们讲解了如何用 HMM 预测股票价格,使用参数估计和模型评估方法来确定股票的收盘价。除了预测股价,HMM 还能用于股市分析以及时间序列数据的分析。
另外一篇使用 Python 进行股票分析的教程:
参考资料:
https://rubikscode.net/2018/10/29/stock-price-prediction-using-hidden-markov-model/