3. 历史数据的存储——DataSource对象

DataSource 负责对接本地历史数据存储,可以管理基于文件的存储(如 csv/hdf/feather) 或基于数据库的存储(如 MySQL/MariaDB),对上层暴露统一的表结构与读取接口;数据的 下载与更新通常通过 qt.refill_data_source() 完成,而非由 DataSource 自动拉取。

class qteasy.DataSource(source_type: str = 'file', file_type: str = 'csv', file_loc: str = 'data/', host: str = 'localhost', port: int = 3306, user: Optional[str] = None, password: Optional[str] = None, db_name: str = 'qt_db', allow_drop_table: bool = False)[源代码]

管理本地历史数据存储(文件或数据库)的统一入口对象。

DataSource 负责与本地文件或数据库交互,统一管理历史数据表的读取、写入与概览, 并保证生成的数据结构可以被 HistoryPanel 与上层 API 正确消费;当某些表缺失时, DataSource 本身不会自动下载数据,而是配合 refill_data_source 等函数完成维护。 支持的文件类型与数据库种类及更多初始化细节,见文档「DataSource 与本地数据源」 相关章节。

示例

下面示例展示 DataSource 类名(稳定输出);实际使用时需要结合你的本地数据目录或数据库连接参数:

>>> import qteasy as qt
>>> qt.DataSource.__name__
'DataSource'
property all_basic_tables: list

获取所有基础数据表的清单

property all_data_tables: list

获取所有历史数据表(不包括调整表)的清单

property all_sys_tables: list

获取所有系统数据表的清单

property all_tables: list

获取所有数据表的清单

property allow_drop_table: bool

获取是否允许删除数据表

db_run_in_transaction(action: Callable[[...], Any], *args, **kwargs) Any[源代码]

在 DataSource 上执行最小事务封装(DB 真事务,file 为 no-op)。

参数:
  • action (Callable[..., Any]) – 需要在事务中执行的可调用对象

  • *args – 传递给 action 的位置参数

  • **kwargs – 传递给 action 的关键字参数

返回:

action 的返回值

返回类型:

Any

delete_sys_table_data(table: str, record_ids: (<class 'list'>, <class 'tuple'>)) int[源代码]

删除系统数据表中的某些记录,被删除的记录的ID使用列表或tuple传入

参数:
  • table (str) – 需要删除数据的表名

  • record_ids (list of int or tuple of int) – 需要删除的记录的ID列表

返回:

删除的记录数量

返回类型:

int

drop_empty_tables() int[源代码]

从datasource中删除所有空表,即行数为0的表

返回:

删除的表数量

返回类型:

int

drop_table_data(table)[源代码]

删除本地存储的数据表(操作不可撤销,谨慎使用) 如果数据源设置了allow_drop_table为False,则无法删除数据表并报错

参数:

table (str,) – 本地数据表的名称

返回类型:

None

抛出:

RuntimeError – 当数据源设置了allow_drop_table为False时,无法删除数据表并报错:

export_table_data(table, file_name=None, file_path=None, shares=None, start=None, end=None)[源代码]

将数据表中的数据读取出来之后导出到一个文件中,便于用户使用过程中小规模转移数据或察看数据

使用这个函数时,用户可以不用理会数据源的类型,只需要指定数据表名称,以及筛选条件即可 导出的数据会被保存为csv文件,用户可以自行指定文件名以及文件存储路径,如果不指定文件名, 则默认使用数据表名称作为文件名,如果不指定文件存储路径,则默认使用当前工作目录作为 文件存储路径

参数:
  • table (str) – 数据表名称

  • file_name (str, optional) – 导出的文件名,如果不指定,则默认使用数据表名称作为文件名

  • file_path (str, optional) – 导出的文件存储路径,如果不指定,则默认使用当前工作目录作为文件存储路径

  • shares (list of str, optional) – ts_code筛选条件,为空时给出所有记录

  • start (DateTime like, optional) – YYYYMMDD格式日期,为空时不筛选

  • end (Datetime like,optional) – YYYYMMDD格式日期,当start不为空时有效,筛选日期范围

返回:

file_path_name – 导出的文件的完整路径

返回类型:

str

get_all_basic_table_data(refresh_cache=False, raise_error=True)[源代码]

一个快速获取所有basic数据表的函数,通常情况缓存处理以加快速度 如果设置refresh_cache为True,则清空缓存并重新下载数据

参数:
  • refresh_cache (Bool, Default False) – 如果为True,则清空缓存并重新下载数据

  • raise_error (Bool, Default True) – 如果为True,则在数据表为空时抛出ValueError

返回类型:

DataFrame

get_data_table_size(table, human=True, string_form=True)[源代码]

获取数据表占用磁盘空间的大小

参数:
  • table (str) – 数据表名称

  • human (bool, default True) – True时显示容易阅读的形式,如1.5MB而不是1590868, False时返回字节数

  • string_form (bool, default True) – True时以字符串形式返回结果,便于打印

返回:

tuple (size, rows)

返回类型:

tuple of int or str:

get_sys_table_last_id(table)[源代码]

从已有的table中获取最后一个id

参数:

table (str) – 数据表名称

返回:

last_id

返回类型:

int 当前使用的最后一个ID(自增ID)

get_table_data_coverage(table, column, min_max_only=False)[源代码]

获取本地数据表内容的覆盖范围,取出数据表的”column”列中的去重值并返回

参数:
  • table (str,) – 数据表的名称

  • column (str or list of str) – 需要去重并返回的数据列

  • min_max_only (bool, default False) – 为True时不需要返回整个数据列,仅返回最大值和最小值 如果仅返回最大值和和最小值,返回值为一个包含两个元素的列表, 第一个元素是最小值,第二个是最大值,第三个是总数量

返回类型:

List, 代表数据覆盖范围的列表

示例

>>> import qteasy
>>> qteasy.QT_DATA_SOURCE.get_table_data_coverage('stock_daily', 'ts_code')
Out:
['000001.SZ',
 '000002.SZ',
 '000003.SZ',
 '000004.SZ',
 '000005.SZ',
 '000006.SZ',
 ...,
 '002407.SZ',
 '002408.SZ',
 '002409.SZ',
 '002410.SZ',
 '002411.SZ',
 ...]
>>> import qteasy as qt
>>> qt.QT_DATA_SOURCE.get_table_data_coverage('stock_daily', 'ts_code', min_max_only=True)
Out:
['000001.SZ', '873593.BJ']
get_table_info(table, verbose=True, print_info=True, human=True) dict[源代码]
获取并打印数据表的相关信息,包括数据表是否已有数据,数据量大小,占用磁盘空间、数据覆盖范围,

以及数据下载方法

3. Parameters:

table: str

数据表名称

verbose: bool, Default: True

是否显示更多信息,如是,显示表结构等信息

print_info: bool, Default: True

是否打印输出所有结果

human: bool, Default: True

是否给出容易阅读的字符串形式

returns:
  • 一个dict,包含数据表的结构化信息:

  • { – table name: 1, str, 数据表名称 table_exists: 2, bool,数据表是否存在 table_size: 3, int/str,数据表占用磁盘空间,human 为True时返回容易阅读的字符串 table_rows: 4, int/str,数据表的行数,human 为True时返回容易阅读的字符串 primary_key1: 5, str,数据表第一个主键名称 pk_count1: 6, int,数据表第一个主键记录数量 pk_min1: 7, obj,数据表主键1起始记录 pk_max1: 8, obj,数据表主键2最终记录 primary_key2: 9, str,数据表第二个主键名称 pk_count2: 10, int,数据表第二个主键记录 pk_min2: 11, obj,数据表主键2起始记录 pk_max2: 12, obj,数据表主键2最终记录

  • }

info()[源代码]

格式化打印database对象的各种主要信息

insert_sys_table_data(table: str, **data) int[源代码]

插入系统操作表的数据

一次插入一条记录,数据以dict形式给出 不需要给出数据的ID,因为ID会自动生成 如果给出的数据字段不完整,则抛出异常 如果给出的数据中有不可用的字段,则抛出异常

参数:
  • table (str) – 需要更新的数据表名称

  • data (dict) – 需要更新或插入的数据,数据的key必须与数据库表的字段相同,否则会抛出异常

返回:

record_id – 更新的记录ID

返回类型:

int

抛出:

KeyError – 当给出的字段不完整或者有不可用的字段时:

none_sys_tables() list[源代码]

获取所有非系统数据表的清单

overview(tables=None, print_out=True, include_sys_tables=False) DataFrame[源代码]

以表格形式列出所有数据表的当前数据状态

参数:
  • tables (str or list of str, Default None) – 指定要列出的数据表,如果为None则列出所有数据表

  • print_out (bool, Default True) – 是否打印数据表总揽

  • include_sys_tables (bool, Default False) – 是否包含系统表

返回类型:

pd.DataFrame, 包含所有数据表的数据状态

read_cached_table_data(table: str, *, shares: str = None, start: str = None, end: str = None, primary_key_in_index: bool = True) DataFrame[源代码]

缓存数据表数据以缩短读取速度,这个函数用于加快本地提取数据时加速

在用户使用DataType对象大量读取数据时,通常需要重复从同一张数据表中以同样的参数获取数据 为了提升读取速度,可以将数据表的数据缓存到内存中,以减少读取时间,但在正常的数据表操作中 并不适合使用缓存,因为数据表通常需要实时刷新,因此本函数仅供DataType对象读取数据使用

read_sys_table_data(table, **kwargs) DataFrame[源代码]

读取系统操作表的数据,包括读取所有记录,以及根据给定的条件读取记录

返回的数据类型为pd.DataFrame,如果给出kwargs,返回根据条件筛选后的数据

参数:
  • table (str) – 需要读取的数据表名称

  • kwargs (dict) – 筛选数据的条件,包括用作筛选条件的字典如: {account_id = 123}

返回:

返回的数据为DataFrame,如果给出kwargs,返回的数据仅包括筛选后的数据

返回类型:

pd.DataFrame

read_sys_table_record(table, *, record_id: int, **kwargs) dict[源代码]

读取系统操作表的数据,根据指定的id读取数据,返回一个dict

本函数调用read_sys_table_data()读取整个数据表,并返回record_id行的数据 返回的dict包含所有字段的值,key为字段名,value为字段值

参数:
  • table (str) – 需要读取的数据表名称

  • record_id (int) – 需要读取的数据的id

  • kwargs (dict) – 筛选数据的条件,包括用作筛选条件的字典如: account_id = 123

返回:

data – 读取的数据,包括数据表的结构化信息以及数据表中的记录

返回类型:

dict

read_table_data(table, *, shares: Optional[Union[str, list]] = None, start: Optional[str] = None, end: Optional[str] = None, primary_key_in_index: bool = True) DataFrame[源代码]

从本地数据表中读取数据并返回DataFrame,不修改数据格式,primary_key为DataFrame的index

在读取数据表时读取所有的列,但是返回值筛选ts_code以及trade_date between start 和 end

参数:
  • table (str) – 数据表名称

  • shares (str or list of str,) – ts_code筛选条件,逗号分隔字符串,为空时给出所有记录

  • start (str,) – YYYYMMDD格式日期,为空时不筛选

  • end (str,) – YYYYMMDD格式日期,当start不为空时有效,筛选日期范围

  • primary_key_in_index (bool, default True) – 是否将primary key设置为DataFrame的index 如果为False,primary key将作为普通的列返回,此时DataFrame的index为默认的整数index

返回类型:

pd.DataFrame 返回数据表中的数据

reconnect()[源代码]
当数据库超时或其他原因丢失连接时,Ping数据库检查状态,

如果可行的话,重新连接数据库

返回:

  • True (连接成功)

  • False (连接失败)

table_data_exists(table)[源代码]

逻辑层函数,判断数据表是否存在

参数:

table (数据表名称) –

返回:

bool

返回类型:

True if table exists, False otherwise

property tables: list

所有已经建立的tables的清单

update_sys_table_data(table: str, record_id: int, **data) int[源代码]

更新系统操作表的数据,根据指定的id更新数据,更新的内容由kwargs给出。

每次只能更新一条数据,数据以dict形式给出 可以更新一个或多个字段,如果给出的字段不存在,则抛出异,id不可更新。 id必须存在,否则抛出异常

参数:
  • table (str) – 需要更新的数据表名称

  • record_id (int) – 需要更新的数据的id

  • data (dict) – 需要更新的数据,包括需要更新的字段如: account_id = 123

返回:

id – 更新的记录ID

返回类型:

int

抛出:
  • KeyError – 当给出的id不存在或为None时:

  • KeyError – 当给出的字段不存在时:

update_table_data(table, df, merge_type='update') int[源代码]

检查输入的df,去掉不符合要求的列或行后,将数据合并到table中,包括以下步骤:

1,检查下载后的数据表的列名是否与数据表的定义相同,删除多余的列 2,如果datasource type是”db”,删除下载数据中与本地数据重复的部分,仅保留新增数据 3,如果datasource type是”file”,将下载的数据与本地数据合并并去重 返回处理完毕的dataFrame

参数:
  • table (str,) – 数据表名,必须是database中定义的数据表

  • merge_type (str) – 指定如何合并下载数据和本地数据: - ‘update’: 默认值,如果下载数据与本地数据重复,用下载数据替代本地数据 - ‘ignore’ : 如果下载数据与本地数据重复,忽略重复部分

  • df (pd.DataFrame) – 通过传递一个DataFrame获取数据 如果数据获取渠道为”df”,则必须给出此参数

返回类型:

int, 写入数据表中的数据的行数

write_table_data(df, table, on_duplicate='ignore')[源代码]

将df中的数据写入本地数据表(本地文件或数据库)

如果本地数据表不存在则新建数据表,如果本地数据表已经存在,则将df数据添加在本地表中 如果添加的数据主键与已有的数据相同,处理方式由on_duplicate参数确定

参数:
  • df (pd.DataFrame) – 一个数据表,数据表的列名应该与本地数据表定义一致

  • table (str) – 本地数据表名,

  • on_duplicate (str) – 重复数据处理方式(仅当mode==db的时候有效) -ignore: 默认方式,将全部数据写入数据库表的末尾 -update: 将数据写入数据库表中,如果遇到重复的pk则修改表中的内容

返回:

int

返回类型:

写入的数据条数

备注

注意!!不应直接使用该函数将数据写入本地数据库,因为写入的数据不会被检查 请使用update_table_data()来更新或写入数据到本地