3. 歷史數據的存儲——DataSource對象

DataSource 負責對接本地歷史數據存儲,可以管理基於文件的存儲(如 csv/hdf/feather) 或基於數據庫的存儲(如 MySQL/MariaDB),對上層暴露統一的表結構與讀取接口;數據的 下載與更新通常通過 qt.refill_data_source() 完成,而非由 DataSource 自動拉取。

class qteasy.DataSource(source_type: str = 'file', file_type: str = 'csv', file_loc: str = 'data/', host: str = 'localhost', port: int = 3306, user: Optional[str] = None, password: Optional[str] = None, db_name: str = 'qt_db', allow_drop_table: bool = False)[原始碼]

管理本地歷史數據存儲(文件或數據庫)的統一入口對象。

DataSource 負責與本地文件或數據庫交互,統一管理歷史數據表的讀取、寫入與概覽, 並保證生成的數據結構可以被 HistoryPanel 與上層 API 正確消費;當某些表缺失時, DataSource 本身不會自動下載數據,而是配合 refill_data_source 等函數完成維護。 支持的文件類型與數據庫種類及更多初始化細節,見文檔「DataSource 與本地數據源」 相關章節。

範例

下面示例展示 DataSource 類名(穩定輸出);實際使用時需要結合你的本地數據目錄或數據庫連接參數:

>>> import qteasy as qt
>>> qt.DataSource.__name__
'DataSource'
property all_basic_tables: list

獲取所有基礎數據表的清單

property all_data_tables: list

獲取所有歷史數據表(不包括調整表)的清單

property all_sys_tables: list

獲取所有系統數據表的清單

property all_tables: list

獲取所有數據表的清單

property allow_drop_table: bool

獲取是否允許刪除數據表

db_run_in_transaction(action: Callable[[...], Any], *args, **kwargs) Any[原始碼]

在 DataSource 上執行最小事務封裝(DB 真事務,file 爲 no-op)。

參數:
  • action (Callable[..., Any]) – 需要在事務中執行的可調用對象

  • *args – 傳遞給 action 的位置參數

  • **kwargs – 傳遞給 action 的關鍵字參數

回傳:

action 的返回值

回傳型別:

Any

delete_sys_table_data(table: str, record_ids: (<class 'list'>, <class 'tuple'>)) int[原始碼]

刪除系統數據表中的某些記錄,被刪除的記錄的ID使用列表或tuple傳入

參數:
  • table (str) – 需要刪除數據的表名

  • record_ids (list of int or tuple of int) – 需要刪除的記錄的ID列表

回傳:

刪除的記錄數量

回傳型別:

int

drop_empty_tables() int[原始碼]

從datasource中刪除所有空表,即行數爲0的表

回傳:

刪除的表數量

回傳型別:

int

drop_table_data(table)[原始碼]

刪除本地存儲的數據表(操作不可撤銷,謹慎使用) 如果數據源設置了allow_drop_table爲False,則無法刪除數據表並報錯

參數:

table (str,) – 本地數據表的名稱

回傳型別:

None

引發:

RuntimeError – 當數據源設置了allow_drop_table爲False時,無法刪除數據表並報錯:

export_table_data(table, file_name=None, file_path=None, shares=None, start=None, end=None)[原始碼]

將數據表中的數據讀取出來之後導出到一個文件中,便於用戶使用過程中小規模轉移數據或察看數據

使用這個函數時,用戶可以不用理會數據源的類型,只需要指定數據表名稱,以及篩選條件即可 導出的數據會被保存爲csv文件,用戶可以自行指定文件名以及文件存儲路徑,如果不指定文件名, 則默認使用數據表名稱作爲文件名,如果不指定文件存儲路徑,則默認使用當前工作目錄作爲 文件存儲路徑

參數:
  • table (str) – 數據表名稱

  • file_name (str, optional) – 導出的文件名,如果不指定,則默認使用數據表名稱作爲文件名

  • file_path (str, optional) – 導出的文件存儲路徑,如果不指定,則默認使用當前工作目錄作爲文件存儲路徑

  • shares (list of str, optional) – ts_code篩選條件,爲空時給出所有記錄

  • start (DateTime like, optional) – YYYYMMDD格式日期,爲空時不篩選

  • end (Datetime like,optional) – YYYYMMDD格式日期,當start不爲空時有效,篩選日期範圍

回傳:

file_path_name – 導出的文件的完整路徑

回傳型別:

str

get_all_basic_table_data(refresh_cache=False, raise_error=True)[原始碼]

一個快速獲取所有basic數據表的函數,通常情況緩存處理以加快速度 如果設置refresh_cache爲True,則清空緩存並重新下載數據

參數:
  • refresh_cache (Bool, Default False) – 如果爲True,則清空緩存並重新下載數據

  • raise_error (Bool, Default True) – 如果爲True,則在數據表爲空時拋出ValueError

回傳型別:

DataFrame

get_data_table_size(table, human=True, string_form=True)[原始碼]

獲取數據表佔用磁盤空間的大小

參數:
  • table (str) – 數據表名稱

  • human (bool, default True) – True時顯示容易閱讀的形式,如1.5MB而不是1590868, False時返回字節數

  • string_form (bool, default True) – True時以字符串形式返回結果,便於列印

回傳:

tuple (size, rows)

回傳型別:

tuple of int or str:

get_sys_table_last_id(table)[原始碼]

從已有的table中獲取最後一個id

參數:

table (str) – 數據表名稱

回傳:

last_id

回傳型別:

int 当前使用的最后一个ID(自增ID)

get_table_data_coverage(table, column, min_max_only=False)[原始碼]

獲取本地數據表內容的覆蓋範圍,取出數據表的」column」列中的去重值並返回

參數:
  • table (str,) – 數據表的名稱

  • column (str or list of str) – 需要去重並返回的數據列

  • min_max_only (bool, default False) – 爲True時不需要返回整個數據列,僅返回最大值和最小值 如果僅返回最大值和和最小值,返回值爲一個包含兩個元素的列表, 第一個元素是最小值,第二個是最大值,第三個是總數量

回傳型別:

List, 代表数据覆盖范围的列表

範例

>>> import qteasy
>>> qteasy.QT_DATA_SOURCE.get_table_data_coverage('stock_daily', 'ts_code')
Out:
['000001.SZ',
 '000002.SZ',
 '000003.SZ',
 '000004.SZ',
 '000005.SZ',
 '000006.SZ',
 ...,
 '002407.SZ',
 '002408.SZ',
 '002409.SZ',
 '002410.SZ',
 '002411.SZ',
 ...]
>>> import qteasy as qt
>>> qt.QT_DATA_SOURCE.get_table_data_coverage('stock_daily', 'ts_code', min_max_only=True)
Out:
['000001.SZ', '873593.BJ']
get_table_info(table, verbose=True, print_info=True, human=True) dict[原始碼]
獲取並列印數據表的相關資訊,包括數據表是否已有數據,數據量大小,佔用磁盤空間、數據覆蓋範圍,

以及數據下載方法

3. Parameters:

table: str

數據表名稱

verbose: bool, Default: True

是否顯示更多資訊,如是,顯示錶結構等資訊

print_info: bool, Default: True

是否列印輸出所有結果

human: bool, Default: True

是否給出容易閱讀的字符串形式

returns:
  • 一個dict,包含數據表的結構化資訊:

  • { – table name: 1, str, 數據表名稱 table_exists: 2, bool,數據表是否存在 table_size: 3, int/str,數據表佔用磁盤空間,human 爲True時返回容易閱讀的字符串 table_rows: 4, int/str,數據表的行數,human 爲True時返回容易閱讀的字符串 primary_key1: 5, str,數據表第一個主鍵名稱 pk_count1: 6, int,數據表第一個主鍵記錄數量 pk_min1: 7, obj,數據表主鍵1起始記錄 pk_max1: 8, obj,數據表主鍵2最終記錄 primary_key2: 9, str,數據表第二個主鍵名稱 pk_count2: 10, int,數據表第二個主鍵記錄 pk_min2: 11, obj,數據表主鍵2起始記錄 pk_max2: 12, obj,數據表主鍵2最終記錄

  • }

info()[原始碼]

格式化列印database對象的各種主要資訊

insert_sys_table_data(table: str, **data) int[原始碼]

插入系統操作表的數據

一次插入一條記錄,數據以dict形式給出 不需要給出數據的ID,因爲ID會自動生成 如果給出的數據字段不完整,則拋出異常 如果給出的數據中有不可用的字段,則拋出異常

參數:
  • table (str) – 需要更新的數據表名稱

  • data (dict) – 需要更新或插入的數據,數據的key必須與數據庫表的字段相同,否則會拋出異常

回傳:

record_id – 更新的記錄ID

回傳型別:

int

引發:

KeyError – 當給出的字段不完整或者有不可用的字段時:

none_sys_tables() list[原始碼]

獲取所有非系統數據表的清單

overview(tables=None, print_out=True, include_sys_tables=False) DataFrame[原始碼]

以表格形式列出所有數據表的當前數據狀態

參數:
  • tables (str or list of str, Default None) – 指定要列出的數據表,如果爲None則列出所有數據表

  • print_out (bool, Default True) – 是否列印數據表總攬

  • include_sys_tables (bool, Default False) – 是否包含系統表

回傳型別:

pd.DataFrame, 包含所有数据表的数据状态

read_cached_table_data(table: str, *, shares: str = None, start: str = None, end: str = None, primary_key_in_index: bool = True) DataFrame[原始碼]

緩存數據表數據以縮短讀取速度,這個函數用於加快本地提取數據時加速

在用戶使用DataType對象大量讀取數據時,通常需要重複從同一張數據表中以同樣的參數獲取數據 爲了提升讀取速度,可以將數據表的數據緩存到內存中,以減少讀取時間,但在正常的數據表操作中 並不適合使用緩存,因爲數據表通常需要實時刷新,因此本函數僅供DataType對象讀取數據使用

read_sys_table_data(table, **kwargs) DataFrame[原始碼]

讀取系統操作表的數據,包括讀取所有記錄,以及根據給定的條件讀取記錄

返回的數據類型爲pd.DataFrame,如果給出kwargs,返回根據條件篩選後的數據

參數:
  • table (str) – 需要讀取的數據表名稱

  • kwargs (dict) – 篩選數據的條件,包括用作篩選條件的字典如: {account_id = 123}

回傳:

返回的數據爲DataFrame,如果給出kwargs,返回的數據僅包括篩選後的數據

回傳型別:

pd.DataFrame

read_sys_table_record(table, *, record_id: int, **kwargs) dict[原始碼]

讀取系統操作表的數據,根據指定的id讀取數據,返回一個dict

本函數調用read_sys_table_data()讀取整個數據表,並返回record_id行的數據 返回的dict包含所有字段的值,key爲字段名,value爲字段值

參數:
  • table (str) – 需要讀取的數據表名稱

  • record_id (int) – 需要讀取的數據的id

  • kwargs (dict) – 篩選數據的條件,包括用作篩選條件的字典如: account_id = 123

回傳:

data – 讀取的數據,包括數據表的結構化資訊以及數據表中的記錄

回傳型別:

dict

read_table_data(table, *, shares: Optional[Union[str, list]] = None, start: Optional[str] = None, end: Optional[str] = None, primary_key_in_index: bool = True) DataFrame[原始碼]

從本地數據表中讀取數據並返回DataFrame,不修改數據格式,primary_key爲DataFrame的index

在讀取數據表時讀取所有的列,但是返回值篩選ts_code以及trade_date between start 和 end

參數:
  • table (str) – 數據表名稱

  • shares (str or list of str,) – ts_code篩選條件,逗號分隔字符串,爲空時給出所有記錄

  • start (str,) – YYYYMMDD格式日期,爲空時不篩選

  • end (str,) – YYYYMMDD格式日期,當start不爲空時有效,篩選日期範圍

  • primary_key_in_index (bool, default True) – 是否將primary key設置爲DataFrame的index 如果爲False,primary key將作爲普通的列返回,此時DataFrame的index爲默認的整數index

回傳型別:

pd.DataFrame 返回数据表中的数据

reconnect()[原始碼]
當數據庫超時或其他原因丟失連接時,Ping數據庫檢查狀態,

如果可行的話,重新連接數據庫

回傳:

  • True (連接成功)

  • False (連接失敗)

table_data_exists(table)[原始碼]

邏輯層函數,判斷數據表是否存在

參數:

table (数据表名称) –

回傳:

bool

回傳型別:

True if table exists, False otherwise

property tables: list

所有已經建立的tables的清單

update_sys_table_data(table: str, record_id: int, **data) int[原始碼]

更新系統操作表的數據,根據指定的id更新數據,更新的內容由kwargs給出。

每次只能更新一條數據,數據以dict形式給出 可以更新一個或多個字段,如果給出的字段不存在,則拋出異,id不可更新。 id必須存在,否則拋出異常

參數:
  • table (str) – 需要更新的數據表名稱

  • record_id (int) – 需要更新的數據的id

  • data (dict) – 需要更新的數據,包括需要更新的字段如: account_id = 123

回傳:

id – 更新的記錄ID

回傳型別:

int

引發:
  • KeyError – 當給出的id不存在或爲None時:

  • KeyError – 當給出的字段不存在時:

update_table_data(table, df, merge_type='update') int[原始碼]

檢查輸入的df,去掉不符合要求的列或行後,將數據合併到table中,包括以下步驟:

1,檢查下載後的數據表的列名是否與數據表的定義相同,刪除多餘的列 2,如果datasource type是」db」,刪除下載數據中與本地數據重複的部分,僅保留新增數據 3,如果datasource type是」file」,將下載的數據與本地數據合併並去重 返回處理完畢的dataFrame

參數:
  • table (str,) – 數據表名,必須是database中定義的數據表

  • merge_type (str) –

    指定如何合并下载数据和本地数据: - 『update』: 默认值,如果下载数据与本地数据重复,用下载数据替代本地数据;

    table_usage=='basics' 的表在重复主键上仅当下载字段非空时才覆盖该列(patch), 避免稀疏通道(如 AKShare)用空行业/日期等冲掉既有 Tushare 元数据

    • 』ignore』 : 如果下载数据与本地数据重复,忽略重复部分

  • df (pd.DataFrame) – 通過傳遞一個DataFrame獲取數據 如果數據獲取渠道爲」df」,則必須給出此參數

回傳型別:

int, 写入数据表中的数据的行数

write_table_data(df, table, on_duplicate='ignore')[原始碼]

將df中的數據寫入本地數據表(本地文件或數據庫)

如果本地數據表不存在則新建數據表,如果本地數據表已經存在,則將df數據添加在本地表中 如果添加的數據主鍵與已有的數據相同,處理方式由on_duplicate參數確定

參數:
  • df (pd.DataFrame) – 一個數據表,數據表的列名應該與本地數據表定義一致

  • table (str) – 本地數據表名,

  • on_duplicate (str) – 重複數據處理方式(僅當mode==db的時候有效) -ignore: 默認方式,將全部數據寫入數據庫表的末尾 -update: 將數據寫入數據庫表中,如果遇到重複的pk則修改表中的內容

回傳:

int

回傳型別:

写入的数据条数

備註

注意!!不應直接使用該函數將數據寫入本地數據庫,因爲寫入的數據不會被檢查 請使用update_table_data()來更新或寫入數據到本地