用 MatrixOne 构建一个简单的股票分析 Python 应用程序
本教程将向你展示如何使用 MatrixOne 构建一个简单的 Python 应用程序。
关于 Demo
本教程内所展示的 Demo 将存储中国股市的历史股票数据,并进行简单的分析,找出最适合购买的股票。
Demo 构思:跟踪每支股票的市盈率(PE,Price to Earnings ratio,简称 P/E 或 PER)和市净率(PB,Price to book ratio,简称 P/B 或 PBR)水平,因为市盈率和市净率水平可以代表一家公司的市值。有关市盈率和市净率的概念,参考P/E和P/B。
每天股市收盘后,我们都会将市盈率和市净率值与历史最低市盈率和市净率进行比较。如果当前的市盈率低于历史最低水平,那么这支股票很可能被低估了。我们可以进一步调查它下跌的原因,判断现在是不是买入的好时机。
Demo 源码
你可以直接下载matrix_one_app使用。
开始前准备
准备 1:获取 Tushare API Token
- 你需要有一个 Tushare 帐户并获得一个 API Token。如果你还没有注册 Tushare,在https://tushare.pro/进行注册账户,在https://tushare.pro/user/token#获取 API Token。
Info
Tushare 是一个用于爬行中国股票历史数据的开源工具。它有一个完整的数据集,但要注意 Tushare 的 API 访问限制和访问频率的规则,如需正常使用,请按照 Tushare 的 API 访问权限进行付费。
准备 2:安装基础软件
- 
你还需要安装以下两个基础软件: 
- 
Python 3.x 
- MatrixOne
更多详细信息,参见Python 3 installation tutorial and MatrixOne 安装。
准备 3:安装依赖
- 
(选做)下载源码并且安装 Tushare依赖:a. 下载本篇文章所演示的 Demo 的源代码matrix_one_app。 b. 进入到源码路径 matrixone_python_app/stock_analysis 安装 Tushare依赖项:cd matrixone_python_app/stock_analysis pip3 install -r requirements.txt
- 
安装依赖 pymysql。 
Info
MatrixOne 当前仅支持 Pymysql。暂不支持 SQLAlchemy,MySQL-connector 和 MySQLdb。
步骤 1:编写 Python,准备和加载历史数据集
首先,在 MatrixOne 中加载历史股票数据。
Tushare 界面只允许一次读取5000行数据,所以你只需要收集每只股票最新的5000个交易日的数据。每年大约有250个交易日期。5000构成了近20年的数据,这在很大程度上满足了本次 Demo。
- 
建立 Tushare 界面并获取股票列表: import tushare as ts import time # Set Tushare data source ts.set_token('YOUR_TUSHARE_API_TOKEN') pro = ts.pro_api() # Get the list of stocks pool = pro.stock_basic(exchange = '', list_status = 'L', adj = 'qfq', fields = 'ts_code,symbol,name,area,industry,fullname,list_date, market,exchange,is_hs')
- 
使用调用 daily_basic的方法,用ts_code,trade_date,pe和pb字段的数据帧获取每支股票 P/E 和 P/B 的信息,那么 Tushare 将自动输出5000条最新记录,且无需指定开始日期和结束日期。j = 1 for i in pool.ts_code: print('Getting %d stock,Stock Code %s.' % (j, i)) #The interface is limited to be queried 200 times/minute, some little delays are necessary time.sleep(0.301) j+=1 #Get stock P/E and P/B data frames df = pro.daily_basic(**{ "ts_code": i, "trade_date": "", "start_date": "", "end_date": "", "limit": "", "offset": "" }, fields=[ "ts_code", "trade_date", "pe", "pb" ])
- 
启动 MatrixOne 服务器,在 MatrixOne 创建名为 “stock” 的数据库。 mysql> CREATE DATABASE stock.
- 
将股票数据加载到 MatrixOne 中:使用 pymysql作为 Python 与 MatrixOne 的连接器。import pymysql # Open a MatrixOne connection db = pymysql.connect(host='127.0.0.1', port=6001, user='dump', password='111', database='stock') # Create a cursor object cursor = db.cursor() # Create PE table cursor.execute('CREATE TABLE IF NOT EXISTS pe(ts_code VARCHAR(255), trade_date VARCHAR(255), pe FLOAT, pb FLOAT)')
- 
将每5000条数据记录加载到 MatrixOne 中: if df.empty == False: # Fill P/E and P/B values which are NaN df = df.fillna(0.0) val_to_insert = df.values.tolist() cursor.executemany(" insert into pe (ts_code, trade_date,pe,pb) values (%s, %s,%s,%s)", val_to_insert)
步骤 2:找到历史最低市盈率或市盈率股票
- 
将历史股票数据加载到 MatrixOne 后,查看数据记录,可以看到大约有1100万条记录: mysql> select count(*) from pe; +----------+ | count(*) | +----------+ | 11233508 | +----------+ 1 row in set (0.16 sec)
- 
通过 Python 计算每支股票的最低 P/E 和 P/B: # Find stocks that the current P/E is even lower than the historical lowest cursor.execute('select ts_code,min(pe) from pe where pe>0 group by ts_code order by ts_code') # Fetch the result as python object value = cursor.fetchall() # Find stocks that the current P/B is even lower than the historical lowest cursor.execute('select ts_code,min(pb) from pe where pb>0 group by ts_code order by ts_code') # Fetch the result as python object value = cursor.fetchall()
- 
再次调用 Tushare 的 “daily_basic” 接口,得到当前的市盈率和市盈率水平,并进行搜索,找到当前市盈率或市盈率甚至低于历史最低水平的股票。你可以切换任何你想要的交易日期进行比较。这里以市盈率为例: df = pro.daily_basic(**{ "ts_code": "", "trade_date": sys.argv[1], "start_date": "", "end_date": "", "limit": "", "offset": "" }, fields=[ "ts_code", "pe" ]) df = df.fillna(0.0) for i in range(0,len(value)): ts_code, min_pe = value[i] for j in range(0, len(df.ts_code)): if ts_code == df.ts_code[j] and min_pe > df.pe[j] > 0: logging.getLogger().info("ts_code: %s", ts_code) logging.getLogger().info("history lowest PE : %f", min_pe) logging.getLogger().info("current PE found: %f", df.pe[j])
- 
执行完以上代码,将打印出每支市盈率或市盈率最低的股票。 
步骤 3:升级数据集
在上述步骤中,你已经在 MatrixOne 中存储了许多历史数据,每次进行数据分析时,通常都经历了一个新的交易日。那么,数据集需要使用最新的数据进行更新,否则下一次你仍然会与旧数据进行比较。
无需再次插入所有数据,你只需插入具有最新交易日期的数据:
# get updating trading dates by user argument inputs
df = pro.daily_basic(**{
            "ts_code": i,
            "trade_date": "",
            "start_date": self.startDate,,
            "end_date": self.endDate,
            "limit": "",
            "offset": ""
}, fields=[
            "ts_code",
            "trade_date",
            "pe",
            "pb"
])
# append update to MatrixOne
if df.empty == False:
            df = df.fillna(0.0)
            val_to_insert = df.values.tolist()
            print(val_to_insert)
            cursor.executemany(" insert into pe (ts_code, trade_date,pe,pb) values (%s, %s,%s,%s)", val_to_insert)