from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv('需要入库的数据的绝对路径',encoding='gbk')
# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data['市场代码'] = Exchange.SHFE
# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
imported_data['interval'] = Interval.MINUTE
接下来,我们还需要对每列数据的数据类型进行修改,确保和 BarData
中各个属性的数据类型一致。BarData
中属性的数据类型可以分为三大类:float
类, datetime
类 和 自定义枚举类 (Interval
和 Exchange
)。因为,上面已经修改过了Interval
和 Exchange
,下面只需要修改 float
和 datetime
类。
修改 float
类代码:
# 明确需要是float数据类型的列
float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
for col in float_columns:
imported_data[col] = imported_data[col].astype('float')
修改 datatime
类代码:
# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y%m%d %H:%M:%S'
imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
下一步,我们还需要对列名进行修改:
# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
另外,因为该csv文件储存的是ag的主力连续数据,即多张ag合约的拼接。因此,symbol
列中有多个不同到期日的ag合约代码,这里需要将合约代码统一为ag88
:
imported_data['symbol'] ='ag88'
最后,我们使用 vn.py 封装好的 database_manager.save_bar_data
将数据入库:
# 导入 database_manager 模块
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
bars = []
start = None
count = 0
for row in imported_data.itertuples():
bar = BarData(
symbol=row.symbol,
exchange=row.exchange,
datetime=row.datetime,
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
end = bar.datetime
# insert into database
database_manager.save_bar_data(bars, collection_name)
print(f"Insert Bar: {count} from {start} - {end}")
如果,默认的数据库是其它vn.py支持的数据库,上面的代码需要做略微修改后便可以使用(详情请看结尾Debug部分)。
如果,没有设置分表储存不同类型的数据。则需要先将move_df_to_mongodb
函数中的collection_name
参数删除,同时将上面代码的倒数第二行修改为:
database_manager.save_bar_data(bars)
如果,想要将数据储存储存在 SQLite 数据库中也很简单(默认数据库不是SQLite)。只需要两步就可以完成。
创建一个sqlite数据库连接对象:
from vnpy.trader.database.initialize import init_sql
from vnpy.trader.database.database import Driver
settings={
"database": "database.db",
"host": "localhost",
"port": 3306,
"user": "root",
"password": "",
"authentication_source": "admin"
sqlite_manager = init_sql(driver=Driver.SQLITE, settings=settings)
2.使用sqlite数据库连接对象将数据入库
# 替换函数 move_df_to_mongodb 的倒数第二行
sqlite_manager.save_bar_data(bars)
本文尝试从数据库配置,数据入库基本流程,数据入库具体实现,三部分来帮助vn.py新用户解决编写python脚本实现数据入库这个难点。借助vn.py的database_manager
模块,用户基本上可以无缝切换SQLite,MongoDB等vn.py支持的数据库来读取和存入数据。希望这篇文章能帮助大家快速进入量化策略的研究和开发。
Debug
使用上述代码进行Sqlite
数据入库的时候,会出现peewee.InterfaceError: Error binding parameter 2 - probably unsupported type
错误,解决方法:
找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
代码所在行
在该行代码下键入imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
详细的Debug过程记录在sqlite数据入库Debug. 将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook就可以复现整个过程.
from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData,TickData)
# 封装函数
def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
bars = []
start = None
count = 0
for row in imported_data.itertuples():
bar = BarData(
symbol=row.symbol,
exchange=row.exchange,
datetime=row.datetime,
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
end = bar.datetime
# insert into database
database_manager.save_bar_data(bars, collection_name)
print(f'Insert Bar: {count} from {start} - {end}')
if __name__ == "__main__":
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv('D:/1分钟数据压缩包/FutAC_Min1_Std_2016/ag主力连续.csv',encoding='gbk')
# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data['市场代码'] = Exchange.SHFE
# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
imported_data['interval'] = Interval.MINUTE
# 明确需要是float数据类型的列
float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
for col in float_columns:
imported_data[col] = imported_data[col].astype('float')
# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y%m%d %H:%M:%S'
imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
imported_data['symbol'] ='ag88'
move_df_to_mongodb(imported_data,'ag88')
需要进行如下操作:
找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
代码
在上述代码下增加imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
完整的可交互Debug过程请参考sqlite数据入库Debug。将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook 就可以使用.
找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码所在行
在该行代码下键入imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
请问,第二行后面的,这里面的 dt 在哪里定义的,直接调用会报错吗?
但是,我提的这个问题,你说的原帖给的debug过程记录里应该有吧,这个里面好像没有。
其实我的问题是:
imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
这行代码是csv导入sqlite的功能脚本文件里面的程序,dt 在哪里定义的,好像没找到,直接引用语法错误吗,为什么这样语句,程序编译正常呢?
dt就是pands.series.dt.strftime的dt,官方文档在https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.strftime.html?highlight=dt%20strft#pandas.Series.dt.strftime
不然你可以尝试一下把dt换成datetime, 应该会报错
symbol=row.symbol,
exchange=row.exchange,
datetime=row.datetime,#报错的地方,需要增加时区信息
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
运行会报错:
AttributeError: 'str' object has no attribute 'astimezone'
原因是新版本vnpy支持了时区数据,所以datetime=row.datetime
需要加上时区信息
from datetime import datetime, timedelta, timezone
# 中国时区是+8,对应参数hours=8
# 日本时区是+9,hours=9
utc_8 = timezone(timedelta(hours=8))
datetime=row.datetime.replace(tzinfo=utc_8)
一个小技巧,把bar里面的数据打印出来,看看都是什么格式。
sql_manager.get_oldest_bar_data()
symbol=row.symbol,
exchange=row.exchange,
datetime=row.datetime,#报错的地方,需要增加时区信息
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
运行会报错:
AttributeError: 'str' object has no attribute 'astimezone'
原因是新版本vnpy支持了时区数据,所以datetime=row.datetime
需要加上时区信息
from datetime import datetime, timedelta, timezone
# 中国时区是+8,对应参数hours=8
# 日本时区是+9,hours=9
utc_8 = timezone(timedelta(hours=8))
datetime=row.datetime.replace(tzinfo=utc_8)
一个小技巧,把bar里面的数据打印出来,看看都是什么格式。
sql_manager.get_oldest_bar_data()
大佬牛!!!出错后发现真是这个问题导致的,必须加时区
Traceback (most recent call last):
File "d:/study/论坛的入库程序.py", line 43, in <module>
move_df_to_mongodb(imported_data)
File "d:/study/论坛的入库程序.py", line 26, in move_df_to_mongodb
gateway_name="DB",
File "<string>", line 14, in init
File "D:\Python\Miniconda3\lib\site-packages\vnpy\trader\object.py", line 99, in post_init
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
AttributeError: 'str' object has no attribute 'value'
奇怪我的报错为啥和别人不同?版本2.3
xiaohe wrote:
你的exchange是一个str,没有value。检查一下格式吧
谢谢,好像是格式问题,虽然我赋值的时候是类属性,但是我没有直接导入,而是重新写回了CSV文件,结果写回之后变成了str类,这点我还没搞明白。
现在又有新问题了,
from vnpy.trader.constant import (Exchange, Interval)
import pandas as pd
from vnpy.trader.database import database_manager
from vnpy.trader.object import (BarData, TickData)
def move_df_to_mongodb(imported_data: pd.DataFrame):
bars = []
start = None
count = 0
for row in imported_data.itertuples():
bar = BarData(
symbol=row.symbol,
exchange=row.exchange,
datetime=row.datetime,
interval=row.interval,
volume=row.volume,
open_price=row.open,
high_price=row.high,
low_price=row.low,
close_price=row.close,
open_interest=row.open_interest,
gateway_name="DB",
bars.append(bar)
# do some statistics
count += 1
if not start:
start = bar.datetime
end = bar.datetime
# insert into database
database_manager.save_bar_data(bars)
print(f'Insert Bar: {count} from {start} - {end}')
if name == "main":
datatype = 1 # 1分钟数据用,2日线数据用
csvfilepath = 'e:\\test\\1test.csv'
# 读取需要入库的csv文件,该文件是用gbk编码
imported_data = pd.read_csv(csvfilepath, encoding='gbk')
# 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
imported_data.loc[imported_data['exchange'].values == 'SSE', 'exchange'] = Exchange.SSE
imported_data.loc[imported_data['exchange'].values == 'SZSE', 'exchange'] = Exchange.SZSE
imported_data.loc[imported_data['exchange'].values == 'CFFEX', 'exchange'] = Exchange.CFFEX
# 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
if datatype == 1:
imported_data['interval'] = Interval.MINUTE
elif datatype == 2:
imported_data['interval'] = Interval.DAILY
# 明确需要是float数据类型的列
float_columns = ['open', 'high', 'low', 'close', 'volume', 'open_interest']
for col in float_columns:
imported_data[col] = imported_data[col].astype('float')
# 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y/%m/%d %H:%M:%S'
imported_data['datetime'] = pd.to_datetime(imported_data['datetime'], format=datetime_format)
# 因为没有用到 成交额 这一列的数据,所以该列列名不变
imported_data.columns = ['exchange', 'symbol', 'name', 'datetime', 'open', 'high', 'low', 'close', 'volume',
'open_interest','interval']
move_df_to_mongodb(imported_data)
我因为是导入A股数据,本地时间格式2021/6/1 9:31:00 这样的,csv格式如下
VNPY的版本是2.3,现在出了
Traceback (most recent call last):
File "D:/study/论坛的入库程序.py", line 68, in <module>
move_df_to_mongodb(imported_data)
File "D:/study/论坛的入库程序.py", line 37, in move_df_to_mongodb
database_manager.save_bar_data(bars)
File "D:\Python\Miniconda3\lib\site-packages\vnpy\database\mongodb\mongodb_database.py", line 163, in save_bar_data
bar.datetime = convert_tz(bar.datetime)
File "D:\Python\Miniconda3\lib\site-packages\vnpy\trader\database.py", line 20, in convert_tz
dt = dt.astimezone(DB_TZ)
File "pandas/_libs/tslibs/timestamps.pyx", line 884, in pandas._libs.tslibs.timestamps.Timestamp.tz_convert
TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize
我看到楼上有兄弟同样的问题说是换了下时间格式可以了,麻烦问下,我这种情况,时间格式应该用什么的?
补充:我后来又print了一下datetime字段 print(imported_data['datetime'])
居然显示是3359 2021-06-09 15:00:00
Name: datetime, Length: 3360, dtype: object
于是后来我又试着把 # 明确时间戳的格式
# %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
datetime_format = '%Y/%m/%d %H:%M:%S'
这句修改成datetime_format = '%Y-%m-%d %H:%M:%S' 结果依然报错TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize
甚至也改过datetime_format = '%Y%m%d %H:%M:%S' 还是不对
我也不知道怎么办了,麻烦哪位能帮我解答一下,谢谢
ruzwdy wrote:
"" move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str)""
这里怎么看出来是 用 MongoDB 数据库的 .
请问你的问题是?
楼主写这个函数因为他自己用的是mongodb