3. Managing Historical Data - DataSource Object

DataSource is responsible for interfacing with local historical data storage. It can manage file-based storage (such as csv/hdf/feather) or database-based storage (such as MySQL/MariaDB), and exposes a unified table schema and read interface to upper layers. Data downloading and updates are typically performed via qt.refill_data_source() rather than being pulled automatically by DataSource.

class qteasy.DataSource(source_type: str = 'file', file_type: str = 'csv', file_loc: str = 'data/', host: str = 'localhost', port: int = 3306, user: Optional[str] = None, password: Optional[str] = None, db_name: str = 'qt_db', allow_drop_table: bool = False)[source]

A unified entry-point object for managing local historical data storage (files or databases).

DataSource is responsible for interacting with local files or databases, centrally managing the reading, writing, and overview of historical data tables, and ensuring that the generated data structures can be correctly consumed by HistoryPanel and upper-layer APIs. When some tables are missing, DataSource itself will not automatically download data; instead, it works with functions such as refill_data_source to perform maintenance. For supported file types, database types, and more initialization details, see the relevant section of the documentation, “DataSource and Local Data Sources”.

Examples

The example below shows the DataSource class name (stable output); in actual use, you need to combine it with your local data directory or database connection parameters:

>>> import qteasy as qt
>>> qt.DataSource.__name__
'DataSource'
property all_basic_tables: list

Get the list of all base data tables

property all_data_tables: list

Get the list of all historical data tables (excluding adjustment tables)

property all_sys_tables: list

Get the list of all system data tables

property all_tables: list

Get the list of all data tables

property allow_drop_table: bool

Get whether deleting data tables is allowed

db_run_in_transaction(action: Callable[[...], Any], *args, **kwargs) Any[source]

Run a minimal transaction wrapper on DataSource (real DB transaction; no-op for file storage).

Parameters:
  • action (Callable[..., Any]) – Callable to run inside the transaction

  • *args – Positional arguments passed to action

  • **kwargs – Keyword arguments passed to action

Returns:

Return value of action

Return type:

Any

delete_sys_table_data(table: str, record_ids: (<class 'list'>, <class 'tuple'>)) int[source]

Delete certain records from the system data table; pass the IDs of the records to be deleted as a list or tuple

Parameters:
  • table (str) – The name of the table whose data needs to be deleted

  • record_ids (list of int or tuple of int) – List of IDs of records to delete

Returns:

Number of records deleted

Return type:

int

drop_empty_tables() int[source]

Delete all empty tables from the datasource, i.e., tables with 0 rows

Returns:

Number of tables deleted

Return type:

int

drop_table_data(table)[source]

Delete a locally stored data table (this operation is irreversible; use with caution). If the data source has allow_drop_table set to False, the data table cannot be deleted and an error will be raised.

Parameters:

table (str,) – Name of the local data table

Return type:

None

Raises:

RuntimeError – When the data source has allow_drop_table set to False, you cannot delete tables and an error will be raised:

export_table_data(table, file_name=None, file_path=None, shares=None, start=None, end=None)[source]

After reading the data from the data table, export it to a file, making it convenient for users to transfer small amounts of data or view data during use.

When using this function, users do not need to care about the type of data source. They only need to specify the table name and filter conditions. The exported data will be saved as a CSV file. Users can specify the file name and storage path themselves. If no file name is specified, the table name is used as the default file name. If no storage path is specified, the current working directory is used as the default storage path.

Parameters:
  • table (str) – Table name

  • file_name (str, optional) – Exported file name. If not specified, the table name is used as the file name by default

  • file_path (str, optional) – Storage path for the exported file. If not specified, the current working directory is used by default as the file storage path.

  • shares (list of str, optional) – ts_code filter condition; if empty, all records are returned

  • start (DateTime like, optional) – Date in YYYYMMDD format; if empty, no filtering is applied

  • end (Datetime like,optional) – Date in YYYYMMDD format. Effective when start is not empty; used to filter the date range.

Returns:

file_path_name – Full path of the exported file

Return type:

str

get_all_basic_table_data(refresh_cache=False, raise_error=True)[source]

A function to quickly retrieve all basic data tables. Typically, caching is used to speed things up. If refresh_cache is set to True, the cache is cleared and the data is downloaded again.

Parameters:
  • refresh_cache (Bool, Default False) – If True, clear the cache and re-download the data.

  • raise_error (Bool, Default True) – If True, raise ValueError when the data table is empty.

Return type:

DataFrame

get_data_table_size(table, human=True, string_form=True)[source]

Get the amount of disk space used by the data table.

Parameters:
  • table (str) – Table name

  • human (bool, default True) – When True, display in an easy-to-read format, such as 1.5MB instead of 1590868; when False, return the number of bytes.

  • string_form (bool, default True) – When True, return the result as a string for easier printing

Returns:

tuple (size, rows)

Return type:

tuple of int or str:

get_sys_table_last_id(table)[source]

Get the last ID from an existing table

Parameters:

table (str) – Table name

Returns:

last_id

Return type:

int 当前使用的最后一个ID(自增ID)

get_table_data_coverage(table, column, min_max_only=False)[source]

Get the coverage range of the local data table content: retrieve the deduplicated values from the table’s “column” column and return them.

Parameters:
  • table (str,) – Name of the data table

  • column (str or list of str) – Data columns that need to be deduplicated and returned

  • min_max_only (bool, default False) – When True, there is no need to return the entire data column; only return the maximum and minimum values. If only the maximum and minimum values are returned, the return value is a list containing two elements: the first element is the minimum value, the second is the maximum value, and the third is the total count.

Return type:

List, 代表数据覆盖范围的列表

Examples

>>> import qteasy
>>> qteasy.QT_DATA_SOURCE.get_table_data_coverage('stock_daily', 'ts_code')
Out:
['000001.SZ',
 '000002.SZ',
 '000003.SZ',
 '000004.SZ',
 '000005.SZ',
 '000006.SZ',
 ...,
 '002407.SZ',
 '002408.SZ',
 '002409.SZ',
 '002410.SZ',
 '002411.SZ',
 ...]
>>> import qteasy as qt
>>> qt.QT_DATA_SOURCE.get_table_data_coverage('stock_daily', 'ts_code', min_max_only=True)
Out:
['000001.SZ', '873593.BJ']
get_table_info(table, verbose=True, print_info=True, human=True) dict[source]
Retrieve and print information about the table, including whether it already contains data, the data volume, disk space usage, and data coverage range,

and the data download method

3. Parameters

table: str

Table name

verbose: bool, Default: True

Whether to display more information; if so, show the table schema and other details.

print_info: bool, Default: True

Whether to print all results.

human: bool, Default: True

Whether to provide an easy-to-read string representation.

Returns:
  • A dict containing structured information about the data table:

  • { – table name: 1, str, table name table_exists: 2, bool, whether the table exists table_size: 3, int/str, disk space used by the table; when human is True, returns an easy-to-read string table_rows: 4, int/str, number of rows in the table; when human is True, returns an easy-to-read string primary_key1: 5, str, name of the first primary key pk_count1: 6, int, record count for the first primary key pk_min1: 7, obj, starting record of primary key 1 pk_max1: 8, obj, ending record of primary key 2 primary_key2: 9, str, name of the second primary key pk_count2: 10, int, record count for the second primary key pk_min2: 11, obj, starting record of primary key 2 pk_max2: 12, obj, ending record of primary key 2

  • }

info()[source]

Format and print various key information about the database object

insert_sys_table_data(table: str, **data) int[source]

Insert data into the system operations table.

Insert one record at a time; the data is provided as a dict. You do not need to provide the data ID, because the ID will be generated automatically. If the provided data fields are incomplete, an exception is raised. If the provided data contains unavailable fields, an exception is raised.

Parameters:
  • table (str) – Name of the table to update

  • data (dict) – Data to update or insert. The keys in the data must match the fields in the database table; otherwise, an exception will be thrown.

Returns:

record_id – ID of the record to update

Return type:

int

Raises:

KeyError – When the provided fields are incomplete or contain unavailable fields:

none_sys_tables() list[source]

Get a list of all non-system tables

overview(tables=None, print_out=True, include_sys_tables=False) DataFrame[source]

List the current data status of all data tables in tabular form

Parameters:
  • tables (str or list of str, Default None) – Specify the data tables to list. If None, list all data tables.

  • print_out (bool, Default True) – Whether to print a table overview

  • include_sys_tables (bool, Default False) – Whether to include system tables

Return type:

pd.DataFrame, 包含所有数据表的数据状态

read_cached_table_data(table: str, *, shares: str = None, start: str = None, end: str = None, primary_key_in_index: bool = True) DataFrame[source]

Cache data table data to reduce read time. This function is used to speed up local data extraction.

When a user uses a DataType object to read large amounts of data, it is usually necessary to repeatedly fetch data from the same table with the same parameters. To improve read speed, you can cache the table data in memory to reduce read time. However, in normal table operations, caching is not suitable because tables usually need to be refreshed in real time. Therefore, this function is only for DataType objects to use when reading data.

read_sys_table_data(table, **kwargs) DataFrame[source]

Read data from the system operations table, including reading all records and reading records based on the given conditions.

The returned data type is pd.DataFrame. If kwargs are provided, return the data filtered according to the conditions.

Parameters:
  • table (str) – Name of the data table to read.

  • kwargs (dict) – Conditions for filtering data, including a dictionary used as filter criteria, e.g.: {account_id = 123}.

Returns:

The returned data is a DataFrame. If kwargs are provided, the returned data includes only the filtered data.

Return type:

pd.DataFrame

read_sys_table_record(table, *, record_id: int, **kwargs) dict[source]

Read data from the system operations table. Read data by the specified id and return a dict.

This function calls read_sys_table_data() to read the entire table and returns the data for the record_id row. The returned dict contains the values of all fields, where the key is the field name and the value is the field value.

Parameters:
  • table (str) – Name of the data table to read.

  • record_id (int) – ID of the data to read.

  • kwargs (dict) – Filtering conditions, including a dictionary used as filter criteria, e.g.: account_id = 123

Returns:

data – The data read, including the table’s schema information and the records in the table.

Return type:

dict

read_table_data(table, *, shares: Optional[Union[str, list]] = None, start: Optional[str] = None, end: Optional[str] = None, primary_key_in_index: bool = True) DataFrame[source]

Read data from a local table and return a DataFrame without modifying the data format; primary_key is used as the DataFrame index.

When reading the table, read all columns, but in the return value filter by ts_code and trade_date between start and end.

Parameters:
  • table (str) – Table name

  • shares (str or list of str,) – ts_code filter condition: a comma-separated string. If empty, all records are returned.

  • start (str,) – Date in YYYYMMDD format; if empty, no filtering is applied

  • end (str,) – Date in YYYYMMDD format. Effective when start is not empty; used to filter the date range.

  • primary_key_in_index (bool, default True) – Whether to set the primary key as the DataFrame index. If False, the primary key will be returned as a regular column, and the DataFrame index will be the default integer index.

Return type:

pd.DataFrame 返回数据表中的数据

reconnect()[source]
When the database connection is lost due to a timeout or other reasons, ping the database to check its status,

Reconnect to the database if possible.

Returns:

  • True (Connection successful)

  • False (connection failed)

table_data_exists(table)[source]

Logic-layer function that determines whether the table exists

Parameters:

table (数据表名称) –

Returns:

bool

Return type:

True if table exists, False otherwise

property tables: list

A list of all tables that have already been created

update_sys_table_data(table: str, record_id: int, **data) int[source]

Update data in the system operations table. Update the data by the specified id; the update content is provided by kwargs.

Only one record can be updated at a time. Data is provided as a dict. One or more fields can be updated. If a provided field does not exist, an exception is raised. The id cannot be updated. The id must exist; otherwise an exception is raised.

Parameters:
  • table (str) – Name of the table to update

  • record_id (int) – The id of the data that needs to be updated

  • data (dict) – Data to be updated, including the fields to update, e.g.: account_id = 123

Returns:

id – ID of the record to update

Return type:

int

Raises:
  • KeyError – When the given id does not exist or is None:

  • KeyError – When the provided field does not exist:

update_table_data(table, df, merge_type='update') int[source]

Check the input df. After removing columns or rows that do not meet the requirements, merge the data into the table, including the following steps:

1,检查下载后的数据表的列名是否与数据表的定义相同,删除多余的列 2,如果datasource type是”db”,删除下载数据中与本地数据重复的部分,仅保留新增数据 3,如果datasource type是”file”,将下载的数据与本地数据合并并去重 返回处理完毕的dataFrame

Parameters:
  • table (str,) – Table name; must be a table defined in database.

  • merge_type (str) – Specify how to merge downloaded data and local data: - ‘update’: default; if downloaded data duplicates local data, replace local data with downloaded data - ‘ignore’: if downloaded data duplicates local data, ignore the duplicate part

  • df (pd.DataFrame) – Get data by passing in a DataFrame. If the data source channel is “df”, this parameter must be provided.

Return type:

int, 写入数据表中的数据的行数

write_table_data(df, table, on_duplicate='ignore')[source]

Write the data in df to the local data table (local file or database).

If the local data table does not exist, create a new table. If the local data table already exists, append the df data to the local table. If the primary key of the appended data is the same as existing data, the handling method is determined by the on_duplicate parameter

Parameters:
  • df (pd.DataFrame) – A data table whose column names should be consistent with the local table definition.

  • table (str) – Local table name,

  • on_duplicate (str) – How to handle duplicate data (only effective when mode==db): -ignore: default; append all data to the end of the database table -update: write data into the database table; if a duplicate pk is encountered, modify the content in the table

Returns:

int

Return type:

写入的数据条数

Notes

Warning!! Do not use this function to write data directly to the local database, because the written data will not be validated. Please use update_table_data() to update or write data to the local database.