添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

如何编写一个python脚本将本地 .csv 文件导入数据库是vn.py论坛中新用户提问较多的问题之一。本文的主要目的是帮助新用户解决数据入库问题,以便用户可以快速进入量化策略的开发。

本文主要分为三大部分,第一部分介绍了在vn.py中使用MongoDB数据库所需要进行的配置 (只打算使用vn.py默认SQLite数据库的用户,可以简单了解一下 )。第二部分介绍了数据入库的基本流程 (适用于vn.py支持的所有数据库)。最后一部分则是具体的实现:分别将数据导入MongoDB和SQLite数据库(适用于vn.py支持的所有数据库)。

另外,在正文开始之前,还需要提醒大家:在将数据导入数据库之前,务必要确保这些数据已经是 被清洗过 干净 的数据。如果将没有被清洗过,质量差的数据直接入库进行回测,可能会导致各种问题,影响策略开发的效率。因此,建议大家使用 高质量 的数据源。

配置数据库

vn.py 中默认使用 SQLite 数据库。 因此,如果需要使用 MongoDB 数据库,则需要修改 vn.py 的全局配置。具体流程如下:

  • C:\Users\你的用户名\.vntrader 目录下找到 vt_setting.json 文件

  • vt_setting.json 文件中的 database.driver,database.database,database.host,database.port 进行修改。

    下图是我自己设置的数据库配置信息:

    上图中的两个 "mongodb" 可能让人会有些困扰。实际上第一个 "mongodb" 是告诉 vn.py 我们使用的数据库类型是 MongoDB 数据库而不是默认的 SQLite 数据库。 第二个 "mongodb" 是告诉 vn.py 回测所需要的数据储存在 MongoDB 数据库中一个叫做 "mongodb" 的 database 中。这样说可能有些绕口,请看下图:

    上图是 MongoDB 数据库的官方图形界面。我们可以清楚的看到在该 MongoDB 数据库中一共有四个 database, 分别是 admin,config,local, mongodb 。 另外, mongodb database 中 分别 储存了 不同类型 的期货数据。比如, AP888,IF888, RB888 等。在该 database 中,不同类型的期货数据, 分别 储存在不同的 collection(表)中。vn.py默认是将所有的期货数据储存在同一个collection中并进行读取。 如果,你想让 vn.py 将不同类型的期货数据分别储存在不同的collection中。请阅读这两篇文章 vn.py社区精选18 - 老用户福音,MongDB分表重构 vn.py社区精选19 - 福音收尾,MongoDB分表读取数据

    数据入库基本流程

    在配置 MongoDB 数据库和设置好 MongoDB 分表读取之后(不配置数据库或设置分表读取,不影响后面的内容),我们可以正式开始讨论数据入库的基本流程。vn.py 提供了很多工具使数据入库这个过程变得简单,快捷。下面是数据入库的基本流程:

  • 先确定要入库的数据是 Tick 还是 Bar (K线) 类型数据

  • 将需要入库的数据转化成 vn.py 定义的 TickData BarData 数据类型

  • 使用 vn.py 提供的数据入库工具函数 database_manager.save_tick_data database_manager.save_bar_data 将相应的 TickData BarData 入库

    从上面的数据入库流程中可以看出,在 vn.py 中数据入库的流程还是比较简单的。难点集中在第二步(将本地数据转换成vn.py 定义的 TickData BarData )。一般来说,就是对数据的时间戳处理上。另外,如果数据本身的质量不高,比如数据的整个时间戳格式前后不一致,那需要先对数据的时间戳格式进行统一。时间戳相关知识请查看 Python时间日期格式化之time与datetime模块 这篇文章。

    数据入库具体实现

    在了解了数据入库的基本流程之后,我们来实现一次数据入库的过程。首先,来看一看我们要入库的数据:

    从上图可以看出,C 列储存的是表示时间的数据且 C2 和 C3 的时间间隔是1分钟。所以,要入库的数据是1分钟的 Bar (K线)数据类型。下面我们进行第二步:将需要入库的数据转化成 vn.py 定义的 BarData

    首先,我们先来认识一下 vn.py 中的 BarData

    从上图中可以看出, BarData 一共有11个属性。其中, BarData.vt_symbol 会在 BarData 实例化的时候自动生成。另外,需要指出的是 BarData.exchange BarData.inteval 的数据类型分别是 vn.py 中定义好的枚举常量 Exchange Inteval BarData.datetime 则是 python 标准库 datetime 中的 datetime 数据类型。

    Exchange 数据结构的代码:

    Interval 数据结构的代码:

    在认识了vn.py 中的 BarData 之后,我们开始着手将需要入库的数据转化成 BarData 类型数据。再来重温一下,需要入库数据的格式:

    通过和上文 BarData 的数据结构对比,我们有以下几个发现:

  • csv文件中的 合约代码 时间 开 高 低 收 成交量 持仓量 BarData 中的 symbol datetime open_price high_price low_price close_price volume open_interest 一一对应(从名称就就可以看出)。

  • csv文件中 市场代码 没办法和 BarData 中的 exchange 对应。因为csv文件中 市场代码 都是 SC ,而在上图 Exchange 数据结构代码截图中找不到和 SC 对应的枚举常量的绑定值。从合约代码 ag1608(沪银1608) 可以推断出这里的 SC 指的就是上海期货交易所,对应的枚举常量是 Exchang.SHFE

  • csv文件中缺少了和 BarData 中的 interval 相对应的数据。上文我们已经发现了 csv文件中储存的是1分钟的 BarData ,对应的枚举常量是 Interval.MINUTE

    基于上面的发现,很自然的,我们需要进行如下的操作:

  • 将csv文件中 市场代码 SC 替换成 Exchang.SHFE

  • 增加一列数据,且该列数据的所有值都是 Interval.MINUTE

    一般情况下,使用 python 的 pandas 库可以方便的完成上面的操作。如果数据的质量较差,比如数据的分隔符设置存在问题,会使得 pd.read_csv 函数没办法正确的读取 .csv 文件。这时则需要使用python的 csv 库。本文的数据入库过程统一使用 pandas 来完成。 具体操作,如下:

    from vnpy.trader.constant import (Exchange, Interval)
    import pandas as pd
    # 读取需要入库的csv文件,该文件是用gbk编码
    imported_data = pd.read_csv('需要入库的数据的绝对路径',encoding='gbk')
    # 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
    imported_data['市场代码'] = Exchange.SHFE
    # 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
    imported_data['interval'] = Interval.MINUTE
    

    接下来,我们还需要对每列数据的数据类型进行修改,确保和 BarData 中各个属性的数据类型一致。BarData中属性的数据类型可以分为三大类:float 类, datetime 类 和 自定义枚举类 (IntervalExchange)。因为,上面已经修改过了IntervalExchange,下面只需要修改 floatdatetime 类。

    修改 float 类代码:

    # 明确需要是float数据类型的列
    float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
    for col in float_columns:
      imported_data[col] = imported_data[col].astype('float')
    

    修改 datatime 类代码:

    # 明确时间戳的格式
    # %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
    datetime_format = '%Y%m%d %H:%M:%S'
    imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
    

    下一步,我们还需要对列名进行修改:

    # 因为没有用到 成交额 这一列的数据,所以该列列名不变
    imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
    

    另外,因为该csv文件储存的是ag的主力连续数据,即多张ag合约的拼接。因此,symbol列中有多个不同到期日的ag合约代码,这里需要将合约代码统一为ag88

    imported_data['symbol'] ='ag88'
    

    最后,我们使用 vn.py 封装好的 database_manager.save_bar_data 将数据入库:

    # 导入 database_manager 模块
    from vnpy.trader.database import database_manager
    from vnpy.trader.object import (BarData,TickData)
    # 封装函数
    def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
        bars = []
        start = None
        count = 0
        for row in imported_data.itertuples():
            bar = BarData(
                  symbol=row.symbol,
                  exchange=row.exchange,
                  datetime=row.datetime,
                  interval=row.interval,
                  volume=row.volume,
                  open_price=row.open,
                  high_price=row.high,
                  low_price=row.low,
                  close_price=row.close,
                  open_interest=row.open_interest,
                  gateway_name="DB",
            bars.append(bar)
            # do some statistics
            count += 1
            if not start:
                start = bar.datetime
        end = bar.datetime
        # insert into database
        database_manager.save_bar_data(bars, collection_name)
        print(f"Insert Bar: {count} from {start} - {end}")
    

    如果,默认的数据库是其它vn.py支持的数据库,上面的代码需要做略微修改后便可以使用(详情请看结尾Debug部分)。

    如果,没有设置分表储存不同类型的数据。则需要先将move_df_to_mongodb函数中的collection_name参数删除,同时将上面代码的倒数第二行修改为:

    database_manager.save_bar_data(bars)
    

    如果,想要将数据储存储存在 SQLite 数据库中也很简单(默认数据库不是SQLite)。只需要两步就可以完成。

  • 创建一个sqlite数据库连接对象:
  • from vnpy.trader.database.initialize import init_sql
    from vnpy.trader.database.database import Driver
    settings={
        "database": "database.db",
        "host": "localhost",
        "port": 3306,
        "user": "root",
        "password": "",
        "authentication_source": "admin"
    sqlite_manager = init_sql(driver=Driver.SQLITE, settings=settings)
    

    2.使用sqlite数据库连接对象将数据入库

    # 替换函数 move_df_to_mongodb 的倒数第二行
    sqlite_manager.save_bar_data(bars)
    

    本文尝试从数据库配置,数据入库基本流程,数据入库具体实现,三部分来帮助vn.py新用户解决编写python脚本实现数据入库这个难点。借助vn.py的database_manager模块,用户基本上可以无缝切换SQLite,MongoDB等vn.py支持的数据库来读取和存入数据。希望这篇文章能帮助大家快速进入量化策略的研究和开发。

    Debug

    使用上述代码进行Sqlite数据入库的时候,会出现peewee.InterfaceError: Error binding parameter 2 - probably unsupported type错误,解决方法:

  • 找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码所在行
  • 在该行代码下键入imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
  • 详细的Debug过程记录在sqlite数据入库Debug. 将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook就可以复现整个过程.

    from vnpy.trader.constant import (Exchange, Interval)
    import pandas as pd
    from vnpy.trader.database import database_manager
    from vnpy.trader.object import (BarData,TickData)
    # 封装函数
    def move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str):
        bars = []
        start = None
        count = 0
        for row in imported_data.itertuples():
            bar = BarData(
                  symbol=row.symbol,
                  exchange=row.exchange,
                  datetime=row.datetime,
                  interval=row.interval,
                  volume=row.volume,
                  open_price=row.open,
                  high_price=row.high,
                  low_price=row.low,
                  close_price=row.close,
                  open_interest=row.open_interest,
                  gateway_name="DB",
            bars.append(bar)
            # do some statistics
            count += 1
            if not start:
                start = bar.datetime
        end = bar.datetime
        # insert into database
        database_manager.save_bar_data(bars, collection_name)
        print(f'Insert Bar: {count} from {start} - {end}')
    if __name__ == "__main__":
        # 读取需要入库的csv文件,该文件是用gbk编码
        imported_data = pd.read_csv('D:/1分钟数据压缩包/FutAC_Min1_Std_2016/ag主力连续.csv',encoding='gbk')
        # 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
        imported_data['市场代码'] = Exchange.SHFE
        # 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
        imported_data['interval'] = Interval.MINUTE
        # 明确需要是float数据类型的列
        float_columns = ['开', '高', '低', '收', '成交量', '持仓量']
        for col in float_columns:
          imported_data[col] = imported_data[col].astype('float')
        # 明确时间戳的格式
        # %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
        datetime_format = '%Y%m%d %H:%M:%S'
        imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)
        # 因为没有用到 成交额 这一列的数据,所以该列列名不变
        imported_data.columns = ['exchange','symbol','datetime','open','high','low','close','volume','成交额','open_interest','interval']
        imported_data['symbol'] ='ag88'
        move_df_to_mongodb(imported_data,'ag88')
    需要进行如下操作:

  • 找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码
  • 在上述代码下增加imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
  • 完整的可交互Debug过程请参考sqlite数据入库Debug。将该文件夹内的内容下载到本地同一个位置,运行Jupyter Notebook 就可以使用.

    找到imported_data['时间'] = pd.to_datetime(imported_data['时间'],format=datetime_format)代码所在行
    在该行代码下键入imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')

    请问,第二行后面的,这里面的 dt 在哪里定义的,直接调用会报错吗?

    但是,我提的这个问题,你说的原帖给的debug过程记录里应该有吧,这个里面好像没有。

    其实我的问题是:
    imported_data['时间'] = imported_data['时间'].dt.strftime('%Y%m%d %H:%M:%S')
    这行代码是csv导入sqlite的功能脚本文件里面的程序,dt 在哪里定义的,好像没找到,直接引用语法错误吗,为什么这样语句,程序编译正常呢?

    dt就是pands.series.dt.strftime的dt,官方文档在https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.strftime.html?highlight=dt%20strft#pandas.Series.dt.strftime
    不然你可以尝试一下把dt换成datetime, 应该会报错

    symbol=row.symbol, exchange=row.exchange, datetime=row.datetime,#报错的地方,需要增加时区信息 interval=row.interval, volume=row.volume, open_price=row.open, high_price=row.high, low_price=row.low, close_price=row.close, open_interest=row.open_interest, gateway_name="DB",

    运行会报错:

    AttributeError: 'str' object has no attribute 'astimezone'

    原因是新版本vnpy支持了时区数据,所以datetime=row.datetime需要加上时区信息

    from datetime import datetime, timedelta, timezone
    # 中国时区是+8,对应参数hours=8
    # 日本时区是+9,hours=9
    utc_8 = timezone(timedelta(hours=8))
    datetime=row.datetime.replace(tzinfo=utc_8)
    

    一个小技巧,把bar里面的数据打印出来,看看都是什么格式。

    sql_manager.get_oldest_bar_data()
                  symbol=row.symbol,
                  exchange=row.exchange,
                  datetime=row.datetime,#报错的地方,需要增加时区信息
                  interval=row.interval,
                  volume=row.volume,
                  open_price=row.open,
                  high_price=row.high,
                  low_price=row.low,
                  close_price=row.close,
                  open_interest=row.open_interest,
                  gateway_name="DB",
    

    运行会报错:

    AttributeError: 'str' object has no attribute 'astimezone'

    原因是新版本vnpy支持了时区数据,所以datetime=row.datetime需要加上时区信息

    from datetime import datetime, timedelta, timezone
    # 中国时区是+8,对应参数hours=8
    # 日本时区是+9,hours=9
    utc_8 = timezone(timedelta(hours=8))
    datetime=row.datetime.replace(tzinfo=utc_8)
    

    一个小技巧,把bar里面的数据打印出来,看看都是什么格式。

    sql_manager.get_oldest_bar_data()
    

    大佬牛!!!出错后发现真是这个问题导致的,必须加时区

    Traceback (most recent call last):
    File "d:/study/论坛的入库程序.py", line 43, in <module>
    move_df_to_mongodb(imported_data)
    File "d:/study/论坛的入库程序.py", line 26, in move_df_to_mongodb
    gateway_name="DB",
    File "<string>", line 14, in init
    File "D:\Python\Miniconda3\lib\site-packages\vnpy\trader\object.py", line 99, in post_init
    self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
    AttributeError: 'str' object has no attribute 'value'
    奇怪我的报错为啥和别人不同?版本2.3

    xiaohe wrote:

    你的exchange是一个str,没有value。检查一下格式吧
    谢谢,好像是格式问题,虽然我赋值的时候是类属性,但是我没有直接导入,而是重新写回了CSV文件,结果写回之后变成了str类,这点我还没搞明白。
    现在又有新问题了,
    from vnpy.trader.constant import (Exchange, Interval)
    import pandas as pd
    from vnpy.trader.database import database_manager
    from vnpy.trader.object import (BarData, TickData)

    def move_df_to_mongodb(imported_data: pd.DataFrame):
    bars = []
    start = None
    count = 0

    for row in imported_data.itertuples():
        bar = BarData(
            symbol=row.symbol,
            exchange=row.exchange,
            datetime=row.datetime,
            interval=row.interval,
            volume=row.volume,
            open_price=row.open,
            high_price=row.high,
            low_price=row.low,
            close_price=row.close,
            open_interest=row.open_interest,
            gateway_name="DB",
        bars.append(bar)
        # do some statistics
        count += 1
        if not start:
            start = bar.datetime
    end = bar.datetime
    # insert into database
    database_manager.save_bar_data(bars)
    print(f'Insert Bar: {count} from {start} - {end}')
    

    if name == "main":
    datatype = 1 # 1分钟数据用,2日线数据用

    csvfilepath = 'e:\\test\\1test.csv'
    # 读取需要入库的csv文件,该文件是用gbk编码
    imported_data = pd.read_csv(csvfilepath, encoding='gbk')
    # 将csv文件中 `市场代码`的 SC 替换成 Exchange.SHFE SHFE
    imported_data.loc[imported_data['exchange'].values == 'SSE', 'exchange'] = Exchange.SSE
    imported_data.loc[imported_data['exchange'].values == 'SZSE', 'exchange'] = Exchange.SZSE
    imported_data.loc[imported_data['exchange'].values == 'CFFEX', 'exchange'] = Exchange.CFFEX
    # 增加一列数据 `inteval`,且该列数据的所有值都是 Interval.MINUTE
    if datatype == 1:
        imported_data['interval'] = Interval.MINUTE
    elif datatype == 2:
        imported_data['interval'] = Interval.DAILY
    # 明确需要是float数据类型的列
    float_columns = ['open', 'high', 'low', 'close', 'volume', 'open_interest']
    for col in float_columns:
        imported_data[col] = imported_data[col].astype('float')
    # 明确时间戳的格式
    # %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
    datetime_format = '%Y/%m/%d %H:%M:%S'
    imported_data['datetime'] = pd.to_datetime(imported_data['datetime'], format=datetime_format)
    # 因为没有用到 成交额 这一列的数据,所以该列列名不变
    imported_data.columns = ['exchange', 'symbol', 'name', 'datetime', 'open', 'high', 'low', 'close', 'volume',
                             'open_interest','interval']
    move_df_to_mongodb(imported_data)
    我因为是导入A股数据,本地时间格式2021/6/1 9:31:00 这样的,csv格式如下

    VNPY的版本是2.3,现在出了
    Traceback (most recent call last):
    File "D:/study/论坛的入库程序.py", line 68, in <module>
    move_df_to_mongodb(imported_data)
    File "D:/study/论坛的入库程序.py", line 37, in move_df_to_mongodb
    database_manager.save_bar_data(bars)
    File "D:\Python\Miniconda3\lib\site-packages\vnpy\database\mongodb\mongodb_database.py", line 163, in save_bar_data
    bar.datetime = convert_tz(bar.datetime)
    File "D:\Python\Miniconda3\lib\site-packages\vnpy\trader\database.py", line 20, in convert_tz
    dt = dt.astimezone(DB_TZ)
    File "pandas/_libs/tslibs/timestamps.pyx", line 884, in pandas._libs.tslibs.timestamps.Timestamp.tz_convert
    TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize

    我看到楼上有兄弟同样的问题说是换了下时间格式可以了,麻烦问下,我这种情况,时间格式应该用什么的?
    补充:我后来又print了一下datetime字段 print(imported_data['datetime'])
    居然显示是3359 2021-06-09 15:00:00
    Name: datetime, Length: 3360, dtype: object
    于是后来我又试着把 # 明确时间戳的格式

    # %Y/%m/%d %H:%M:%S 代表着你的csv数据中的时间戳必须是 2020/05/01 08:32:30 格式
    datetime_format = '%Y/%m/%d %H:%M:%S'
    

    这句修改成datetime_format = '%Y-%m-%d %H:%M:%S' 结果依然报错TypeError: Cannot convert tz-naive Timestamp, use tz_localize to localize
    甚至也改过datetime_format = '%Y%m%d %H:%M:%S' 还是不对
    我也不知道怎么办了,麻烦哪位能帮我解答一下,谢谢

    ruzwdy wrote:

    "" move_df_to_mongodb(imported_data:pd.DataFrame,collection_name:str)""
    这里怎么看出来是 用 MongoDB 数据库的 .
    请问你的问题是?
    楼主写这个函数因为他自己用的是mongodb

  •