qteasy.backtest 源代码

# coding=utf-8
# ======================================
# File:     backtest.py
# Author:   Jackie PENG
# Contact:  jackie.pengzhao@gmail.com
# Created:  2024-03-12
# Desc:
#   Functions used for backtesting.
# ======================================

import logging
import time
import pandas as pd
import numpy as np
from numba import njit
from typing import Any, Union, Optional

from numpy import bool_, dtype, ndarray
from pandas import DataFrame

from qteasy.finance import CashPlan

from qteasy.utilfuncs import (
    str_to_list,
    FunctionTimer,
)

from qteasy.qt_operator import (
    Operator,
    SIGNAL_TYPE_ID,
)

from qteasy.evaluate import (
    evaluate,
)

from qteasy.finance import (
    apply_execution_slippage,
    get_selling_result,
    get_purchase_result,
)

from qteasy.visual import (
    _plot_loop_result,
    _loop_report_str,
)

from qteasy.trading_util import (
    parse_pt_signals,
    parse_ps_signals,
    parse_vs_signals,
    trim_pt_type_signals,
    sell_direction_adjustment,
    buy_direction_adjustment,
)


@njit(nogil=True, cache=True)
def backtest_step(
        signal_type: Union[int, np.int32, np.int64, np.ndarray],
        op_signal: np.ndarray,
        cash_inflation: np.ndarray,
        is_delivery_day: bool,
        day_num: Union[int, np.int32, np.int64, np.ndarray],
        own_cash: np.ndarray,
        own_amounts: np.ndarray,
        available_cash: np.ndarray,
        available_amounts: np.ndarray,
        trade_prices: np.ndarray,
        cost_params: np.ndarray,
        pt_buy_threshold: float,
        pt_sell_threshold: float,
        long_pos_limit: float,
        short_pos_limit: float,
        allow_sell_short: bool,
        moq_buy: float,
        moq_sell: float,
        cash_delivery_queue: np.ndarray,
        stock_delivery_queue: np.ndarray,
        cash_delivery_period: int,
        stock_delivery_period: int,
        share_count: int,
):
    """ 完成一次完整的单步骤回测计算,包括下面三个步骤:
    1,检查现金增值比例,如果大于0,则更新持有现金和可用现金
    2,调用calculate_trade_results()函数计算本次交易结果
    3,处理现金变动和持仓变动的交割,更新持有现金和可用现金,更新持有资产和可用资产
    更新账户现金和持仓余额后,返回交易后的各项数据和交割队列

    Parameters
    ----------
    signal_type: int
        信号类型:
            0 - PT signal
            1 - PS signal
            2 - VS signal
    op_signal: np.ndarray
        本次交易的个股交易信号
    cash_inflation: float
        现金增值比例
    is_delivery_day: bool
        是否为新的交割日,用于确定是否需要更新交割队列
    day_num: int
        当前的天数,用于确定交割队列中的交割位置
    own_cash: float
        本次交易开始前持有的现金余额(包括交割中的现金)
    own_amounts: np.ndarray
        本次交易开始前持有的资产份额(包括交割中的资产份额)
    available_cash: float
        本次交易开始前账户可用现金余额(交割中的现金不计入余额)
    available_amounts: np.ndarray:
        交易开始前各个股票的可用数量余额(交割中的股票不计入余额)
    trade_prices: np.ndarray
        本次交易发生时各个股票的交易价格
    cost_params: np.ndarray
        交易成本参数,包括买入费率、卖出费率、最低买入费用、最低卖出费用、交易滑点
        buy_rate: float, 交易成本:买入费率
        sell_rate: float, 交易成本:卖出费率
        buy_min: float, 交易成本:最低买入费用
        sell_min: float, 交易成本:最低卖出费用
        slippage: float, 交易成本:交易滑点
    pt_buy_threshold: float
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    pt_sell_threshold: float
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    long_pos_limit: float
        允许建立的多头总仓位与净资产的比值,默认值1.0,表示最多允许建立100%多头仓位
    short_pos_limit: float
        允许建立的空头总仓位与净资产的比值,默认值-1.0,表示最多允许建立100%空头仓位
    allow_sell_short: bool
        True:   允许买空卖空
        False:  只允许买入多头仓位
    moq_buy: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    moq_sell: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    cash_delivery_queue: np.ndarray
        现金交割队列
    stock_delivery_queue: np.ndarray
        股票交割队列
    cash_delivery_period: int
        现金交割周期
    stock_delivery_period: int
        股票交割周期
    share_count: int
        交易的股票数量

    Returns
    -------
    tuple: (next_own_cash, next_available_cash, next_own_amounts, next_available_amounts,
            current_trade_records, current_trade_cost, cash_delivery_queue, stock_delivery_queue)
        next_own_cash: float
            本次交易结束后持有的现金余额(包括交割中的现金)
        next_available_cash: float
            本次交易结束后账户可用现金余额(交割中的现金不计入余额)
        next_own_amounts: np.ndarray
            本次交易结束后持有的资产份额(包括交割中的资产份额)
        next_available_amounts: np.ndarray
            交易结束后各个股票的可用数量余额(交割中的股票不计入余额)
        current_trade_records: np.ndarray
            本次交易的交易记录
        current_trade_cost: np.ndarray
            本次交易的交易费用
        cash_delivery_queue: np.ndarray
            更新后的现金交割队列
        stock_delivery_queue: np.ndarray
            更新后的股票交割队列
    """

    # 1,如果现金增值比例大于0,则更新持有现金和可用现金
    if cash_inflation > 1.:
        own_cash *= cash_inflation
        available_cash *= cash_inflation
        # DEBUG
        # print(f'cash inflation applied: {cash_inflation:.4f}, new own_cash: {own_cash:.2f},
        # new available_cash: {available_cash:.2f}')

    # print(f'\ncalculating trade results for day {day_num}, signal_type: {signal_type}, op_signal: {op_signal}')
    # if day_num == 1781:
    #     import pdb; pdb.set_trace()

    # 2,调用backtest_step函数,计算本次交易的现金变动、持仓变动和交易费用
    cash_gained, cash_spent, amount_purchased, amount_sold, fees = calculate_trade_results(
            signal_type=signal_type,
            own_cash=own_cash,
            own_amounts=own_amounts,
            available_cash=available_cash,
            available_amounts=available_amounts,
            op_signal=op_signal,
            prices=trade_prices,
            cost_params=cost_params,
            pt_buy_threshold=pt_buy_threshold,
            pt_sell_threshold=pt_sell_threshold,
            long_pos_limit=long_pos_limit,
            short_pos_limit=short_pos_limit,
            allow_sell_short=allow_sell_short,
            moq_buy=moq_buy,
            moq_sell=moq_sell,
            cash_delivery_period=cash_delivery_period,
    )
    # DEBUG
    # print(f'calculated trade results'
    #       f'own_cash: {own_cash:.5f}, own_amounts: {own_amounts.round(6)}, \n'
    #       f'available_cash: {available_cash:.5f}, available_amounts: {available_amounts.round(6)}\n'
    #       f'trade results - cash_gained: {cash_gained.sum():.5f}, cash_spent: {cash_spent.sum():.5f}, '
    #       f'amount_purchased: {amount_purchased.sum():.5f}, amount_sold: {amount_sold.sum():.5f}, '
    #       f'fees: {fees.sum():.5f}')

    # 3,处理现金变动和持仓变动的交割,输出交割数据
    new_cash = cash_gained.sum()
    delivered_cash, delivered_stocks = process_backtest_delivery(
            cash_delivery_queue=cash_delivery_queue,
            stock_delivery_queue=stock_delivery_queue,
            is_new_day=is_delivery_day,
            day_num=day_num,
            new_cash=new_cash,
            new_stocks=amount_purchased,
            cash_delivery_period=cash_delivery_period,
            stock_delivery_period=stock_delivery_period,
            share_count=share_count,
    )
    # DEBUG
    # print(f'\nprocessing delivery for day {day_num}, is_delivery_day: {is_delivery_day}\n'
    #       f'cash delivery queue: {cash_delivery_queue.round(6)}, stock delivery queue: '
    #       f'{stock_delivery_queue.round(6)}\n delivered_cash: {delivered_cash:.2f}, '
    #       f'delivered_stocks: {delivered_stocks}')

    # 4, 更新持有现金和可用现金
    next_own_cash = np.round(own_cash + cash_gained.sum() + cash_spent.sum(), 3)
    next_available_cash = np.round(available_cash + delivered_cash + cash_spent.sum(), 3)
    # DEBUG
    # print(f'updating cash for day {day_num}, \n'
    #       f'own_cash: {own_cash:.2f} + cash_gained: {cash_gained.sum():.2f} + cash_spent: '
    #       f'{cash_spent.sum():.2f} = next_own_cash: {next_own_cash:.2f}\n'
    #       f'available_cash: {available_cash:.2f} + delivered_cash: {delivered_cash:.2f} + cash_spent: '
    #       f'{cash_spent.sum():.2f} = next_available_cash: {next_available_cash:.2f}')
    # 更新持有资产和可用资产
    next_own_amounts = np.round(own_amounts + amount_purchased + amount_sold, 3)
    next_available_amounts = np.round(available_amounts + delivered_stocks + amount_sold, 3)
    # DEBUG
    # print(f'updating amounts for day {day_num}, \n'
    #       f'own_amounts: {own_amounts.round(6)} + amount_purchased: {amount_purchased} + amount_sold: '
    #       f'{amount_sold} = next_own_amounts: {next_own_amounts.round(6)}\n'
    #       f'available_amounts: {available_amounts.round(6)} + delivered_stocks: {delivered_stocks} + amount_sold: '
    #       f'{amount_sold} = next_available_amounts: {next_available_amounts.round(6)}')

    # 5, 记录交易记录和交易费用
    current_trade_records = amount_purchased + amount_sold
    current_trade_cost = fees

    return (
        next_own_cash,
        next_available_cash,
        next_own_amounts,
        next_available_amounts,
        current_trade_records,
        current_trade_cost,
        cash_delivery_queue,
        stock_delivery_queue,
    )


@njit(cache=True, nogil=True)
def calculate_trade_results(
        signal_type: Union[int, np.int32, np.int64, np.ndarray],
        own_cash: Union[float, np.float64, np.ndarray],
        own_amounts: np.ndarray,
        available_cash: Union[float, np.float64, np.ndarray],
        available_amounts: np.ndarray,
        op_signal: np.ndarray,
        prices: np.ndarray,
        cost_params: np.ndarray,
        pt_buy_threshold: float,
        pt_sell_threshold: float,
        long_pos_limit: float,
        short_pos_limit: float,
        allow_sell_short: bool,
        moq_buy: float,
        moq_sell: float,
        cash_delivery_period: int,
) -> tuple[ndarray, ndarray, ndarray, ndarray, ndarray]:
    """ 该函数用于批量计算股票交易结果,根据交易信号、价格和持仓情况,结合交易
    成本和仓位限制,计算出每只股票的买入卖出数量、现金变动及交易费用。支持多种
    交易信号类型(PT、PS、VS)和做空机制,并通过Numba加速计算。

    Parameters
    ----------
    signal_type: int
        信号类型:
            0 - PT signal
            1 - PS signal
            2 - VS signal
    own_cash: float
        本次交易开始前持有的现金余额(包括交割中的现金)
    own_amounts: np.ndarray
        本次交易开始前持有的资产份额(包括交割中的资产份额)
    available_cash: np.ndarray
        本次交易开始前账户可用现金余额(交割中的现金不计入余额)
    available_amounts: np.ndarray:
        交易开始前各个股票的可用数量余额(交割中的股票不计入余额)
    op_signal: np.ndarray
        本次交易的个股交易信号
    prices: np.ndarray,
        本次交易发生时各个股票的交易价格
    cost_params: np.ndarray
        交易成本参数,包括固定买入费用、固定卖出费用、买入费率、卖出费率、最低买入费用、最低卖出费用
        buy_rate: float, 交易成本:固定买入费率
        sell_rate: float, 交易成本:固定卖出费率
        buy_min: float, 交易成本:最低买入费用
        sell_min: float, 交易成本:最低卖出费用
        slippage: float, 交易成本:滑点
    pt_buy_threshold: object Cost
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    pt_sell_threshold: object Cost
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    long_pos_limit: float
        允许建立的多头总仓位与净资产的比值,默认值1.0,表示最多允许建立100%多头仓位
    short_pos_limit: float
        允许建立的空头总仓位与净资产的比值,默认值-1.0,表示最多允许建立100%空头仓位
    allow_sell_short: bool
        True:   允许买空卖空
        False:  默认值,只允许买入多头仓位
    moq_buy: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    moq_sell: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    cash_delivery_period: int:
        资金交割周期,单位为天

    Returns
    -------
    tuple: (cash_gained, cash_spent, amounts_purchased, amounts_sold, fee)
        cash_gained:        ndarray, 本批次交易中获得的现金增加额
        cash_spent:         ndarray, 本批次交易中共花费的现金总额
        amounts_purchased:  ndarray, 交易后每个个股账户中的股份增加数量
        amounts_sold:       ndarray, 交易后每个个股账户中的股份减少数量
        fee:                ndarray, 本次交易每个个股的交易费用,包括卖出的费用和买入的费用
    """
    # 1,计算期初资产总额:交易前现金及股票余额在当前价格下的资产总额
    pre_values = own_amounts * prices
    total_value = own_cash + pre_values.sum()
    empty_array = np.zeros_like(op_signal)

    # 2,制定交易计划,生成交易意图(计划买入金额和计划卖出数量)
    if signal_type == 0:  # PT信号
        # 限定PT信号在多空持仓限额范围内,这样就不需要在后续处理交易指令时调整交易数量了
        trimmed_op_signal = trim_pt_type_signals(
                op_signals=op_signal,
                long_pos_limit=long_pos_limit,
                short_pos_limit=short_pos_limit,
        )
        cash_to_spend, amounts_to_sell = parse_pt_signals(
                signals=trimmed_op_signal,
                prices=prices,
                own_amounts=own_amounts,
                own_cash=own_cash,
                pt_buy_threshold=pt_buy_threshold,
                pt_sell_threshold=pt_sell_threshold,
                allow_sell_short=allow_sell_short
        )
        # DEBUG
        # print(f'parsed PT signals, trimmed_op_signal: {trimmed_op_signal.round(3)}, \n'
        #       f'cash_to_spend: {cash_to_spend.round(2)}, amounts_to_sell: {amounts_to_sell.round(2)}')

    elif signal_type == 1:  # PS信号
        cash_to_spend, amounts_to_sell = parse_ps_signals(
                signals=op_signal,
                prices=prices,
                own_amounts=own_amounts,
                own_cash=own_cash,
                allow_sell_short=allow_sell_short
        )
        # 如果是全零信号,则直接返回空结果
        if np.allclose(cash_to_spend, 0.) and np.allclose(amounts_to_sell, 0.):
            return (empty_array,
                    empty_array,
                    empty_array,
                    empty_array,
                    empty_array)

    elif signal_type == 2:  # VS信号
        cash_to_spend, amounts_to_sell = parse_vs_signals(
                signals=op_signal,
                prices=prices,
                own_amounts=own_amounts,
                allow_sell_short=allow_sell_short,
                cost_params=cost_params,
        )
        # 如果是全零信号,则直接返回空结果
        if np.allclose(cash_to_spend, 0.) and np.allclose(amounts_to_sell, 0.):
            return (empty_array,
                    empty_array,
                    empty_array,
                    empty_array,
                    empty_array)

    else:
        raise ValueError('Invalid signal_type')
    # # 这里还需要考虑交易价格为NaN的情况,如果价格为NaN,则不进行交易,直接将交易计划中的买入金额和卖出数量调整为0
    # cash_to_spend = np.where(np.isnan(prices), 0, cash_to_spend)
    # amounts_to_sell = np.where(np.isnan(prices), 0, amounts_to_sell)

    # 3, Operator 级卖出方向调整(可用持仓 / 卖空超额转化;空头超额转多头时含买入佣金预算)
    cash_to_spend, amounts_to_sell = sell_direction_adjustment(
            cash_to_spend,
            amounts_to_sell,
            own_amounts,
            available_amounts,
            prices,
            allow_sell_short,
            cost_params,
    )

    # DEBUG
    # print(f'Processed signals, calculated and adjusted'
    #       f'\ncash_to_spend: {cash_to_spend.round(2)}, \namounts_to_sell: {amounts_to_sell.round(2)}')
    # 4,批量提交股份卖出计划,计算实际卖出份额和交易费用
    amount_sold, cash_gained, fee_selling = get_selling_result(
            prices=prices,
            a_to_sell=amounts_to_sell,
            moq=moq_sell,
            cost_params=cost_params,
    )
    # DEBUG
    # print(f'calculated amount sell result:\n'
    #       f'amount sold: {amount_sold.round(2)}\n'
    #       f'cash gained: {cash_gained.round(2)}')

    if np.allclose(cash_to_spend, 0.0, atol=0.001):
        # 如果所有买入计划绝对值都小于1分钱,则直接跳过后续的计算
        return cash_gained, empty_array, empty_array, amount_sold, fee_selling

    # 5,买入方向调整(可用现金 / 多空限额;PT 且交割为 0 时可复用本期卖出现金)
    avail_cash_f = float(available_cash)
    cash_to_spend = buy_direction_adjustment(
            cash_to_spend,
            int(signal_type),
            int(cash_delivery_period),
            avail_cash_f,
            cash_gained,
            amount_sold,
            own_amounts,
            prices,
            long_pos_limit,
            short_pos_limit,
            allow_sell_short,
            float(total_value),
    )

    # 批量提交股份买入计划,计算实际买入的股票份额和交易费用dflasjdf
    # 由于已经提前确认过现金总额,因此不存在买入总金额超过持有现金的情况
    amount_purchased, cash_spent, fee_buying = get_purchase_result(
            prices=prices,
            cash_to_spend=cash_to_spend,
            moq=moq_buy,
            cost_params=cost_params,
    )
    # DEBUG
    # print(f'calculated purchase result: \namount_purchased: {amount_purchased.round(2)}'
    #       f'cash_spent: {cash_spent}')

    # 4, 计算购入资产产生的交易成本,买入资产和卖出资产的交易成本率可以不同,且每次交易动态计算
    fee = fee_buying + fee_selling
    apply_execution_slippage(
            prices, amount_purchased, amount_sold, cash_spent, cash_gained, fee, cost_params[4],
    )
    # DEBUG
    # print(f'finished calculation: \n'
    #       f'cash_gained: {cash_gained.round(2)}\n'
    #       f'amount_sold: {amount_sold.round(2)}\n'
    #       f'amount_purchased: {amount_purchased.round(2)}'
    #       f'cash_spent: {cash_spent}\n'
    #       f'fee: {fee.round(2)}\n')

    return cash_gained, cash_spent, amount_purchased, amount_sold, fee


@njit()  # 使用numba加速反而更慢,可能是因为函数体太简单,编译和调用开销大于计算开销
def initialize_backtest_delivery_queue(cash_delivery_period: int,
                                       stock_delivery_period: int,
                                       share_count: int):
    """ 初始化回测现金和股票交割队列,因为回测是批量进行的,因此需要通过交割队列进行快速交割计算

    Parameters
    ----------
    cash_delivery_period: int
        现金交割周期
    stock_delivery_period: int
        股票交割周期
    share_count: int
        股票数量

    Returns
    -------
    cash_delivery_queue: np.ndarray
        现金交割队列
    stock_delivery_queue: np.ndarray
        股票交割队列
    """

    cash_delivery_queue = np.zeros(shape=(cash_delivery_period + 1,), dtype='float')

    stock_delivery_queue = np.zeros(shape=(stock_delivery_period + 1, share_count), dtype='float')

    return cash_delivery_queue, stock_delivery_queue


@njit(nogil=True, cache=True)  # 使用numba加速反而更慢,可能是因为函数体太简单,编译和调用开销大于计算开销
def process_backtest_delivery(cash_delivery_queue: np.ndarray,
                              stock_delivery_queue: np.ndarray,
                              is_new_day: bool,
                              day_num: int,
                              new_cash: float,
                              new_stocks: np.ndarray,
                              cash_delivery_period: int,
                              stock_delivery_period: int,
                              share_count: int) -> tuple[Any, np.ndarray]:
    """ 处理回测现金和股票交割队列,计算达到交割期的现金和股票,并更新可用现金和可用股票

    此处需要非常注意区分:股现交割的时机是发生在交易完成前还是交易完成后,因为两种时机意味着
     交割判断是不同的。如果交割发生在交易完成前,那么在“换日”判断基于上一次交易是否与本次交易日期一
     致,如果不一致实际上处理的是前一日的交易结果,此时应该直接进行交割换日。
     但是如果在交易后执行交割,则“换日”判断基于下一次交易是否与本次交易日期一致,将本次交易结果加入
     交割队列后,当下一次交易换日时,才会进行交割换日。
     上述两种交割处理不同,在目前的实现中,交割发生在交易完成后,因此“换日”判断基于下一次交易是否
     与本次交易日期一致。应该按照交易后交割形式实现。

    Parameters
    ----------
    cash_delivery_queue: np.ndarray
        现金交割队列
    stock_delivery_queue: np.ndarray
        股票交割队列
    is_new_day: bool
        是否为新的一天,用于判断是否需要更新交割队列
    day_num: int
        当前的天数,用于确定交割队列中的交割位置
    new_cash: float
        新增的现金,用于加入交割队列
    new_stocks: np.ndarray
        新增的股票,用于加入交割队列
    cash_delivery_period: int
        现金交割周期
    stock_delivery_period: int
        股票交割周期
    share_count: int
        股票数量

    Returns
    -------
    cash_delivery_queue: np.ndarray
        更新后的现金交割队列
    stock_delivery_queue: np.ndarray
        更新后的股票交割队列
    cash_delivered: float
        达到交割期的现金
    stocks_delivered: np.ndarray
        达到交割期的股票数量
    """

    if cash_delivery_period == 0:
        cash_delivered = new_cash
    else:
        # 计算交割队列的起始位置,将新增现金加入交割队列起始位置
        cash_in_pos = day_num % cash_delivery_period
        cash_delivery_queue[cash_in_pos] += new_cash
        if is_new_day:
            # 如果下次交易是新的一天,查找现金交割位置执行交割,
            cash_delivery_pos = (day_num + 1) % cash_delivery_period
            cash_delivered = cash_delivery_queue[cash_delivery_pos]
            cash_delivery_queue[cash_delivery_pos] = 0.0
        else:
            cash_delivered = 0.0

    if stock_delivery_period == 0:
        stocks_delivered = new_stocks.copy()
    else:
        # 计算交割队列起始位置,将新增股票加入交割队列起始位置
        stock_in_pos = day_num % stock_delivery_period
        stock_delivery_queue[stock_in_pos, :] += new_stocks
        if is_new_day:
            # 如果下次交易是新的一天,查找股票交割位置执行交割,
            stock_delivery_pos = (day_num + 1) % stock_delivery_period
            stocks_delivered = stock_delivery_queue[stock_delivery_pos, :].copy()
            stock_delivery_queue[stock_delivery_pos, :] = np.zeros(shape=(share_count,), dtype='float')
        else:
            stocks_delivered = np.zeros(shape=(share_count,), dtype='float')

    return cash_delivered, stocks_delivered


# @njit(nogil=True, cache=True)
def backtest_batch_steps(
        signal_types: np.ndarray,
        op_signals: np.ndarray,
        cash_investment_array: np.ndarray,
        cash_inflation_array: np.ndarray,
        delivery_day_indicators: np.ndarray,
        own_cashes: np.ndarray,
        own_amounts_array: np.ndarray,
        available_cashes: np.ndarray,
        available_amounts_array: np.ndarray,
        trade_prices: np.ndarray,
        trade_records_array: np.ndarray,
        trade_cost_array: np.ndarray,
        cost_params: np.ndarray,
        pt_buy_threshold: float,
        pt_sell_threshold: float,
        long_pos_limit: float,
        short_pos_limit: float,
        allow_sell_short: bool,
        moq_buy: float,
        moq_sell: float,
        cash_delivery_period: int,
        stock_delivery_period: int,
) -> None:
    """批量处理多次交易的回测计算

    输入数据为整个交易过程的交易信号、交易价格、初始持仓和现金等完整的持仓表,
    循环调用backtest_step()函数,同时处理现金和持仓的交割,完成整个交易清单的回测计算

    Parameters
    ----------
    signal_types: np.ndarray[int]
        信号类型数组,所有交易信号的类型代码
    op_signals: np.ndarray[float]
        交易信号数组,所有交易信号
    cash_investment_array: np.ndarray
        现金投资数组,记录每一个交易信号日的现金投资金额
    cash_inflation_array: np.ndarray
        现金增值数组,记录每一个交易信号日相对前一个交易信号日的现金增值幅度
    delivery_day_indicators: np.ndarray
        交割日指标数组,记录每一个交易信号日是否为新的交割日
    own_cashes: np.ndarray
        持有现金清单,完整记录整个回测过程中的持有现金
    own_amounts_array: np.ndarray
        持有资产清单,完整记录整个回测过程中的持有资产
    available_cashes: np.ndarray
        可用现金清单,完整记录整个回测过程中的可用现金
    available_amounts_array: np.ndarray
        可用资产清单,完整记录整个回测过程中的可用资产
    trade_prices: np.ndarray
        交易价格清单,记录每一个运行交易记录时间戳中的各个资产的交易价格
    trade_records_array: np.ndarray
        交易记录清单,完整记录整个回测过程中每只股票的买卖数量记录
    trade_cost_array: np.ndarray
        交易成本清单,完整记录整个回测过程中的交易成本
    cost_params: np.ndarray
        交易成本参数,包括买入费率、卖出费率、最低买入费用、最低卖出费用、交易滑点
        buy_rate: float, 交易成本:固定买入费率
        sell_rate: float, 交易成本:固定卖出费率
        buy_min: float, 交易成本:最低买入费用
        sell_min: float, 交易成本:最低卖出费用
        slippage: float, 交易成本:滑点
    pt_buy_threshold: float
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    pt_sell_threshold: float
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    long_pos_limit: float
        允许建立的多头总仓位与净资产的比值,默认值1.0,表示最多允许建立100%多头仓位
    short_pos_limit: float
        允许建立的空头总仓位与净资产的比值,默认值-1.0,表示最多允许建立100%空头仓位
    allow_sell_short: bool
        True:   允许买空卖空
        False:  默认值,只允许买入多头仓位
    moq_buy: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    moq_sell: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    cash_delivery_period: int
        现金交割周期
    stock_delivery_period: int
        股票交割周期

    Returns
    -------
    None,
    交易的结果会被填充到传入的数组中
        own_cashes: 最终持有现金清单,完整记录整个回测过程中的持有现金变动情况
        available_cashes: 可用现金清单,完整记录整个回测过程中的可用现金变动情况
        own_amounts_array: 最终持有资产清单,完整记录整个回测过程中的持有资产变动情况
        available_amounts_array: 可用资产清单,完整记录整个回测过程中的可用资产变动情况
        trade_records_array: 交易记录清单,完整记录整个回测过程中的每只股票的买卖数量变动记录
        trade_cost_array: 交易费用清单,完整记录整个回测过程中的交易费用

    """

    signal_count = op_signals.shape[0]
    share_count = op_signals.shape[1]
    # 初始化现金和股票交割队列以及交割日计数器
    cash_delivery_queue, stock_delivery_queue = initialize_backtest_delivery_queue(
            cash_delivery_period=cash_delivery_period,
            stock_delivery_period=stock_delivery_period,
            share_count=share_count,
    )
    day_nums = delivery_day_indicators.cumsum().astype('int')

    # 开始循环处理op_signal中的每一条交易信号,获取其signal_type,执行下列步骤:
    for i in range(signal_count):
        # 如果当期有现金投资,则更新持有现金和可用现金
        cash_investment = cash_investment_array[i]
        if cash_investment > 0:
            own_cashes[i] += cash_investment
            available_cashes[i] += cash_investment

        is_delivery_day = bool(delivery_day_indicators[i])

        # 调用backtest_step函数,计算本次交易的现金变动、持仓变动和交易费用
        (own_cashes[i + 1],
         available_cashes[i + 1],
         own_amounts_array[i + 1],
         available_amounts_array[i + 1],
         trade_records_array[i],
         trade_cost_array[i],
         cash_delivery_queue,
         stock_delivery_queue) = backtest_step(
                signal_type=signal_types[i],
                op_signal=op_signals[i],
                cash_inflation=cash_inflation_array[i],
                is_delivery_day=is_delivery_day,
                day_num=day_nums[i],
                own_cash=own_cashes[i],
                own_amounts=own_amounts_array[i],
                available_cash=available_cashes[i],
                available_amounts=available_amounts_array[i],
                trade_prices=trade_prices[i],
                cost_params=cost_params,
                pt_buy_threshold=pt_buy_threshold,
                pt_sell_threshold=pt_sell_threshold,
                long_pos_limit=long_pos_limit,
                short_pos_limit=short_pos_limit,
                allow_sell_short=allow_sell_short,
                moq_buy=moq_buy,
                moq_sell=moq_sell,
                cash_delivery_queue=cash_delivery_queue,
                stock_delivery_queue=stock_delivery_queue,
                cash_delivery_period=cash_delivery_period,
                stock_delivery_period=stock_delivery_period,
                share_count=share_count,
        )

    # 完成全部交易信号的处理后,输出最终的持有现金清单、持有资产清单和交易费用清单
    return None


@njit(nogil=True, cache=True)
def backtest_flash_steps(
        signal_types: np.ndarray,
        op_signals: np.ndarray,
        cash_investment_array: np.ndarray,
        cash_inflation_array: np.ndarray,
        delivery_day_indicators: np.ndarray,
        trade_prices: np.ndarray,
        cost_params: np.ndarray,
        pt_buy_threshold: float,
        pt_sell_threshold: float,
        long_pos_limit: float,
        short_pos_limit: float,
        allow_sell_short: bool,
        moq_buy: float,
        moq_sell: float,
        cash_delivery_period: int,
        stock_delivery_period: int,
) -> tuple[Union[float, Any], Union[ndarray[Any, dtype[Any]], Any]]:
    """相比backtest_batch_steps()函数,以更快的速度批量处理多次交易的回测计算

    该函数省略所有中间计算变量的存储,仅保存最终结果,同时省略与最终结果无关的数据存储,
    从而提升计算速度、节省内存开销。

    输入数据与backtest_batch_steps()函数相似,但是不包含用于记录现金、持股变动以及
    交易记录和交易费用的整个数组。这些数据在函数内部仅仅作为中间变量,存储每一轮回测过
    程的期初结果和期末结果,在完成全部交易信号的回测计算后,输出最后一轮回测的期末结果。

    Parameters
    ----------
    signal_types: np.ndarray[int]
        信号类型数组,所有交易信号的类型代码
    op_signals: np.ndarray[float]
        交易信号数组,所有交易信号
    cash_investment_array: np.ndarray
        现金投资数组,记录每一个交易信号日的现金投资金额
    cash_inflation_array: np.ndarray
        现金增值数组,记录每一个交易信号日相对前一个交易信号日的现金增值幅度
    delivery_day_indicators: np.ndarray
        交割日指标数组,记录每一个交易信号日是否为新的交割日
    trade_prices: np.ndarray
        交易价格清单,记录每一个运行交易记录时间戳中的各个资产的交易价格
    cost_params: np.ndarray
        交易成本参数,包括买入费率、卖出费率、最低买入费用、最低卖出费用、交易滑点
        buy_rate: float, 交易成本:固定买入费率
        sell_rate: float, 交易成本:固定卖出费率
        buy_min: float, 交易成本:最低买入费用
        sell_min: float, 交易成本:最低卖出费用
        slippage: float, 交易成本:滑点
    pt_buy_threshold: float
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    pt_sell_threshold: float
        当交易信号类型为PT时,用于计算买入/卖出信号的强度阈值
    long_pos_limit: float
        允许建立的多头总仓位与净资产的比值,默认值1.0,表示最多允许建立100%多头仓位
    short_pos_limit: float
        允许建立的空头总仓位与净资产的比值,默认值-1.0,表示最多允许建立100%空头仓位
    allow_sell_short: bool
        True:   允许买空卖空
        False:  默认值,只允许买入多头仓位
    moq_buy: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    moq_sell: float:
        投资产品最小买入交易单位,moq为0时允许交易任意数额的金融产品,moq不为零时允许交易的产品数量是moq的整数倍
    cash_delivery_period: int
        现金交割周期
    stock_delivery_period: int
        股票交割周期

    Returns
    -------
    tuple(float, np.ndarray),
    交易的结果保存在最终的计算结果中
        closing_cash: 回测结束后最终持有现金清单
        closing_amounts: 回测结束后最终持有资产清单

    """

    signal_count = op_signals.shape[0]
    share_count = op_signals.shape[1]
    # 初始化现金和股票交割队列以及交割日计数器
    cash_delivery_queue, stock_delivery_queue = initialize_backtest_delivery_queue(
            cash_delivery_period=cash_delivery_period,
            stock_delivery_period=stock_delivery_period,
            share_count=share_count,
    )
    day_nums = delivery_day_indicators.cumsum().astype('int')
    opening_cash = 0.0
    opening_available_cash = 0.0
    opening_amounts = np.zeros(shape=(share_count,), dtype='float')
    opening_available_amounts = np.zeros(shape=(share_count,), dtype='float')

    closing_cash = 0.0
    closing_amounts = np.zeros(shape=(share_count,), dtype='float')

    # 开始循环处理op_signal中的每一条交易信号,获取其signal_type,执行下列步骤:
    for i in range(signal_count):
        # 如果当期有现金投资,则更新持有现金和可用现金
        cash_investment = cash_investment_array[i]
        if cash_investment > 0:
            opening_cash += cash_investment
            opening_available_cash += cash_investment

        is_delivery_day = bool(delivery_day_indicators[i])

        # 调用backtest_step函数,计算本次交易的现金变动、持仓变动和交易费用
        (closing_cash,
         closing_available_cash,
         closing_amounts,
         closing_available_amounts,
         _,
         _,
         cash_delivery_queue,
         stock_delivery_queue) = backtest_step(
                signal_type=signal_types[i],
                op_signal=op_signals[i],
                cash_inflation=cash_inflation_array[i],
                is_delivery_day=is_delivery_day,
                day_num=day_nums[i],
                own_cash=opening_cash,
                own_amounts=opening_amounts,
                available_cash=opening_available_cash,
                available_amounts=opening_available_amounts,
                trade_prices=trade_prices[i],
                cost_params=cost_params,
                pt_buy_threshold=pt_buy_threshold,
                pt_sell_threshold=pt_sell_threshold,
                long_pos_limit=long_pos_limit,
                short_pos_limit=short_pos_limit,
                allow_sell_short=allow_sell_short,
                moq_buy=moq_buy,
                moq_sell=moq_sell,
                cash_delivery_queue=cash_delivery_queue,
                stock_delivery_queue=stock_delivery_queue,
                cash_delivery_period=cash_delivery_period,
                stock_delivery_period=stock_delivery_period,
                share_count=share_count,
        )

        opening_cash = closing_cash
        opening_available_cash = closing_available_cash
        opening_amounts = closing_amounts
        opening_available_amounts = closing_available_amounts

    # 完成全部交易信号的处理后,输出最终的持有现金清单、持有资产清单和交易费用清单
    return closing_cash, closing_amounts


def generate_cash_invest_and_delivery_arrays(invest_cash_plan: CashPlan,
                                             group_merge_type: str,
                                             timing_table: pd.DataFrame) -> (tuple[np.ndarray, np.ndarray, np.ndarray]):
    """ 获取现金投资和通胀率相关参数,生成投资和通胀率数组

    Parameters
    ----------
    invest_cash_plan: CashPlan
        现金投资计划
    group_merge_type: str
        投资策略组合并类型。如果该类型为'NONE',则表示不进行组合并操作,则交易信号的
        数量可能会大于timing_table的长度,否则交易信号的数量与timing_table的长度
        相同
    timing_table: pd.DataFrame
        操作日时间索引, 用于生成对应长度的数组

    Returns
    -------
    cash_investment_array: np.ndarray
        现金投资数组
    inflation_rate_array: np.ndarray
        通胀率数组
    delivery_day_indicators: np.ndarray
        交割日指示数组, 非交割日为0,交割日为1
    """
    if group_merge_type.upper() == 'NONE':
        signal_length = int(timing_table.sum().sum())
        cash_plan_index = timing_table.index.repeat(timing_table.sum(axis=1).values)
    else:
        signal_length = len(timing_table)
        cash_plan_index = timing_table.index

    # 生成包含现金投资和现金通胀率数组的DataFrame
    cash_plan_df = pd.DataFrame(
            {'investment':     np.zeros(shape=(signal_length,), dtype=float),
             'inflation_rate': np.ones(shape=(signal_length,), dtype=float)},
            index=cash_plan_index,
    )
    investment_positions = np.searchsorted(cash_plan_index, invest_cash_plan.plan.index, side='left')
    for pos, amount in zip(investment_positions, invest_cash_plan.amounts):
        if pos < len(cash_plan_df):
            cash_plan_df.iat[pos, 0] += amount  # 累加投资金额

    inflation_rate = invest_cash_plan.ir
    day_diffs = (cash_plan_df.index - cash_plan_df.index[0]).days
    cash_plan_df['inflation_rate'] += inflation_rate * day_diffs / 365  # 年化通胀率转换为日化通胀率
    cash_investment_array = cash_plan_df['investment'].to_numpy()
    cash_inflation_array = cash_plan_df['inflation_rate'].to_numpy()
    cash_inflation_array = cash_inflation_array / np.roll(cash_inflation_array, 1)
    cash_inflation_array[0] = 1.0  # 第一天的通胀率设为1.0

    day_changes = np.diff(day_diffs.values, append=day_diffs.values[-1] + 1)  # 计算相邻日期的差值,最后一个日期后面添加一个新日期以确保最后一天的变化被记录
    day_changes[day_changes.nonzero()] = 1  # 将非零差值设为1,表示天数变化
    return cash_investment_array, cash_inflation_array, day_changes


# 定义一个Backtester类,该类包含一个operator对象,同时包含与operator回测相关的所有属性,同时提供回测结果的生成方法
[文档]class Backtester: """ Backtester类用于对operator对象进行回测操作。 本类的属性包括回测计算中所需的所有参数,包括回测过程中产生的结果数据,这些结果数据以ndarray的形式 在对象的生命周期内长期保存,并可以反复刷新。 这个类只有在operator对象被创建之后才能被实例化,因为Backtester类需要依赖operator对象来生成交易 信号和执行交易。典型用法如下: .. code-block:: python operator = Operator( ... ) # 创建Operator对象 backtested = Operator.backtest( signal_count=100, share_count=10, **kwargs) # 创建Backtester对象 # get backtest raw results: backtested.cash_investment_array backtested.own_cashes ... # get backtest results as DataFrame: result_df = backtested.value_records() trade_log_df = backtested.trade_logs() trade_summary_df = backtested.trade_summary() Attributes ---------- op: Operator 交易操作对象,包含交易信号生成和交易执行的逻辑 """ def __init__(self, op: Operator, shares: list[str], cash_plan: CashPlan, cash_investment_array: np.ndarray, cash_inflation_array: np.ndarray, delivery_day_indicators: np.ndarray, cost_params: np.ndarray, # 交易成本参数 signal_parsing_params: dict, # 交易信号解析参数 trading_moq_params: dict, # 交易最小单位参数 trading_delivery_params: dict, # 交易交割参数 trade_price_data: np.ndarray, # 交易价格数据 benchmark_data: Optional[Union[pd.DataFrame, pd.Series]] = None, evaluate_price_data: Optional[pd.DataFrame] = None, enable_tracing: bool = False, logger: Optional[logging.Logger] = None): """ 初始化Backtester对象,设置operator对象和回测参数,初始化回测结果存储表格 Parameters ---------- op: Operator 交易操作对象,包含交易信号生成和交易执行的逻辑 shares: list[str] 交易标的列表,包含所有交易标的的代码 cash_plan: CashPlan, 现金投资计划 benchmark_data: pd.DataFrame or pd.Series 用于评价回测结果的业绩基准价格,频率为日频,每日收盘价,其索引为回测开始日到结束日的所有交易日日期,列名为benchmark evaluate_price_data: pd.DataFrame, optional 用于评价回测结果的日频收盘价数据,索引为交易日15:00:00,列为资产代码 cash_investment_array: np.ndarray 现金投资数组,记录每一个交易信号日的现金投资金额 cash_inflation_array: np.ndarray 现金增值数组,记录每一个交易信号日相对前一个交易信号日的现金增值幅度 delivery_day_indicators: np.ndarray 交割日指标数组,记录每一个交易信号日是否为新的交割日 cost_params: np.ndarray 交易成本参数,包括买入费率、卖出费率、最低买入费用、最低卖出费用、交易滑点 buy_rate: float, 交易成本:固定买入费率 sell_rate: float, 交易成本:固定卖出费率 buy_min: float, 交易成本:最低买入费用 sell_min: float, 交易成本:最低卖出费用 slippage: float, 交易成本:滑点 signal_parsing_params: dict 交易信号解析参数字典,包含解析交易信号所需的所有参数,通常是parse_signal_parsing_params()函数的输出 trading_moq_params: dict 交易最小单位参数字典,包含交易最小单位相关的所有参数,通常是parse_trading_moq_params()函数的输出 trading_delivery_params: dict 交易交割参数字典,包含交易交割相关的所有参数,通常是parse_trading_delivery_params()函数的输出 trade_price_data: np.ndarray 交易价格数据,记录每一个运行交易记录时间戳中的各个资产的交易价格 enable_tracing: bool, optional, default=False 是否启用回测过程的性能追踪功能,默认值为False logger: Optional[logging.Logger] 可选的日志记录器对象,用于记录回测过程中的日志信息 """ # 参数基础校验 assert isinstance(op, Operator), "op must be an instance of Operator" if isinstance(shares, str): shares = str_to_list(shares) assert isinstance(shares, list) and all(isinstance(s, str) for s in shares), "shares must be a list of strings" # benchmark 必须是一个pd.Series,如果是DataFrame,则转换为Series if isinstance(benchmark_data, pd.DataFrame): if benchmark_data.shape[1] != 1: raise ValueError("benchmark_data DataFrame must have only one column") benchmark_data = benchmark_data.iloc[:, 0] if benchmark_data is not None and not isinstance(benchmark_data, pd.Series): raise TypeError("benchmark_data must be a pandas Series or DataFrame with one column") # 参数一致性校验 n_signals = op.get_signal_count() share_count = len(shares) arrays_to_check = [ ("cash_investment_array", cash_investment_array, (n_signals,)), ("cash_inflation_array", cash_inflation_array, (n_signals,)), ("delivery_day_indicators", delivery_day_indicators, (n_signals,)), ("trade_price_data", trade_price_data, (n_signals, share_count)) ] for name, arr, shape in arrays_to_check: if not isinstance(arr, np.ndarray): raise TypeError(f"{name} must be a numpy array") if arr.shape != shape: raise ValueError(f"{name} should have shape {shape}, but got {arr.shape}") self.op = op self.op_signals: Optional[np.ndarray] = None # 回测生成的交易信号表格,实际上在op内也可以存储 self.shares = shares self.cash_investment_array = cash_investment_array self.cash_inflation_array = cash_inflation_array self.delivery_day_indicators = delivery_day_indicators self.cost_params = cost_params self.signal_parsing_params = signal_parsing_params self.trading_moq_params = trading_moq_params self.trading_delivery_params = trading_delivery_params self.trade_price_data = trade_price_data self.logger = logger self.op_run_time = 0.0 # operator运行时间,单位秒 self.backtest_run_time = 0.0 # 回测运行时间,单位秒 # 1,检查operator对象是否已经准备好,否则raise error, TOOD: 是否有必要? # op.is_ready(raise_error=True) if logger is not None: logger.info('Start backtest operator...') # 2,从operator对象读取交易运行计划和时间表,获取交易信号长度,生成用于存储交易信号和持仓数据的表格 self.op_schedule = op.group_timing_table.index self.n_signals = op.get_signal_count() self.share_count = len(self.shares) # 3.1 现金和股票持仓历史记录表 shape_assets = (self.n_signals + 1, self.share_count) shape_cashes = (self.n_signals + 1,) self.own_cashes = np.zeros(shape=shape_cashes, dtype=float) self.own_amounts_array = np.zeros(shape=shape_assets, dtype=float) self.available_cashes = np.zeros(shape=shape_cashes, dtype=float) self.available_amounts_array = np.zeros(shape=shape_assets, dtype=float) # 3.2 交易过程数据记录表,包括交易记录、交易成本等 shape_signals = (self.n_signals, self.share_count) self.trade_records_array = np.zeros(shape_signals, dtype=float) # 记录每次交易的买卖数量 self.trade_cost_array = np.zeros(shape_signals, dtype=float) # 记录每次交易的交易成本 self.trade_price_array = np.zeros(shape_signals, dtype=float) # 记录每次交易的成交价格,未成交为0,dynamic数据类型的operator需要用到 # 4, 回测交易最终结果:评价指标、交易日志和交易汇总记录 self.backtest_result: dict = {} self.trade_log_df: Optional[pd.DataFrame] = None self.summary_df: Optional[pd.DataFrame] = None self.trace_df: Optional[pd.DataFrame] = None # 5, 其他相关属性(需要增加数据匹配性校验) self.cash_plan = cash_plan self.benchmark_data = benchmark_data self.evaluate_price_data = evaluate_price_data self.enable_tracing = enable_tracing if logger is not None: logger.info('Start backtest operator...')
[文档] def run(self) -> 'Backtester': """ 执行回测计算,生成回测结果数据并存入对象属性中""" self.op.set_shares(self.shares) if self.enable_tracing: self.op.enable_tracing() else: self.op.disable_tracing() # 1,如果operator的交易信号不依赖于回测数据,调用函数backtest_operator_independently()处理回测信号 if not self.op.check_dynamic_data(): if self.logger is not None: self.logger.info('Backtest operator with only static data...') signals = self._backtest_static_operator() # 2,如果operator的交易信号依赖于回测数据,调用函数backtest_operator_dependently()处理回测信号 else: if self.logger is not None: self.logger.info('Backtest operator with dynamic data dependence...') signals = self._backtest_dynamic_operator() self.op_signals = signals if self.logger is not None: self.logger.info('Backtest completed.') return self
[文档] def clear_backtest_buffers(self): """ 清除回测结果缓存数据,将回测结果数据重置为空数组,以便重新进行回测计算 Returns ------- None """ self.op_signals = None self.own_cashes.fill(0.0) self.own_amounts_array.fill(0.0) self.available_cashes.fill(0.0) self.available_amounts_array.fill(0.0) self.trade_records_array.fill(0.0) self.trade_cost_array.fill(0.0) self.backtest_result.clear() self.trade_log_df = None self.summary_df = None
def _backtest_static_operator(self) -> np.ndarray: """处理operator的交易信号仅包含静态数据类型(不依赖交易结果的数据)的情况: """ # 1,调用operator.run()生成完整的交易信号清单,并计算保存运行时间 stypes = np.zeros(self.op.get_signal_count(), dtype=int) s_indices = np.zeros(self.op.get_signal_count(), dtype=int) signals = np.zeros((self.op.get_signal_count(), self.share_count), dtype=float) signal_index = 0 st = time.time() for stype, s_index, signal in self.op.run_strategies(steps=range(len(self.op.group_timing_table))): stypes[signal_index] = SIGNAL_TYPE_ID[stype] s_indices[signal_index] = s_index signals[signal_index, :] = signal signal_index += 1 et = time.time() self.op_run_time = et - st # 2,调用backtest_batch_steps()进行回测,填充回测结果清单 st = time.time() backtest_batch_steps( signal_types=stypes, op_signals=signals, cash_investment_array=self.cash_investment_array, cash_inflation_array=self.cash_inflation_array, delivery_day_indicators=self.delivery_day_indicators, own_cashes=self.own_cashes, available_cashes=self.available_cashes, own_amounts_array=self.own_amounts_array, available_amounts_array=self.available_amounts_array, trade_prices=self.trade_price_data, trade_records_array=self.trade_records_array, trade_cost_array=self.trade_cost_array, cost_params=self.cost_params, **self.signal_parsing_params, **self.trading_moq_params, **self.trading_delivery_params, ) et = time.time() self.backtest_run_time = et - st return signals def _backtest_dynamic_operator(self) -> np.ndarray: """处理operator的交易信号包含动态数据类型(依赖交易结果的数据类型)的情况: 根据输入参数逐步调用operator.run()生成交易信号清单,然后调用backtest_batch_steps()进行回测""" # TODO: 实现交易过程动态数据的获取 # 1,读取初始持仓和现金数据,更新operator中的依赖性历史数据 signals = np.zeros((self.op.get_signal_count(), self.share_count), dtype=float) # 为 Operator 注入完整的交易过程数据源与时间索引,供策略通过 get_data('proc.xxx', ...) 访问。 # 注意:这里不改变原有 dynamic data buffer 机制,仅额外提供 process data 视图。 try: self.op._process_time_index = self.op.op_signal_index.get_level_values(0).to_numpy() except Exception: self.op._process_time_index = None if hasattr(self.op, "_process_data_sources"): self.op._process_data_sources = { "own_cashes": self.own_cashes, "available_cashes": self.available_cashes, "own_amounts": self.own_amounts_array, "available_amounts": self.available_amounts_array, "trade_records": self.trade_records_array, "trade_costs": self.trade_cost_array, # 使用实际成交价格作为交易价格过程数据 "trade_prices": self.trade_price_array, # 使用交易模拟价格作为估值价格,用于 position_value / total_value 一类派生量 "price_data": self.trade_price_data, } self.op.prepare_dynamic_data_buffer( trade_records=self.trade_records_array, # 成交量 trade_prices=self.trade_price_array, # 成交价格 own_cashes=self.own_cashes, # 持有现金 available_cashes=self.available_cashes, holding_positions=self.own_amounts_array, available_positions=self.available_amounts_array, ) cash_delivery_queue, stock_delivery_queue = initialize_backtest_delivery_queue( share_count=self.share_count, **self.trading_delivery_params, ) day_nums = self.delivery_day_indicators.cumsum() timer = FunctionTimer() bt_step = 0 # 循环执行下面步骤,直至完整生成回测结果清单 for i in range(len(self.op.group_timing_table)): # 1,调用operator.run_strategy()生成当前交易信号,注意同一时刻可能会有多组信号生成 # print(f'running / backtest step {i+1}/{bt_step + 1}...') for result in timer.time_function('op_run', self.op.run_strategy, step_index=i): # print(f'got result from op.run_strategy: {result}') stype, s_index, signal = result # 2,开始回测,判断是否有资金投入,如果有,更新持有现金和可用现金 cash_investment = self.cash_investment_array[bt_step] if cash_investment > 0: self.own_cashes[bt_step] += cash_investment self.available_cashes[bt_step] += cash_investment signal_type = SIGNAL_TYPE_ID[stype] is_delivery_day = bool(self.delivery_day_indicators[bt_step]) signals[bt_step, :] = signal # 3,调用backtest_step()回测当前交易信号的结果,生成当前交易回测结果 ( self.own_cashes[bt_step + 1], self.available_cashes[bt_step + 1], self.own_amounts_array[bt_step + 1], self.available_amounts_array[bt_step + 1], self.trade_records_array[bt_step], self.trade_cost_array[bt_step], cash_delivery_queue, stock_delivery_queue, ) = timer.time_function( 'backtest', backtest_step, signal_type=signal_type, op_signal=signal, cash_inflation=self.cash_inflation_array[bt_step], is_delivery_day=is_delivery_day, day_num=day_nums[bt_step], own_cash=self.own_cashes[bt_step], own_amounts=self.own_amounts_array[bt_step, :], available_cash=self.available_cashes[bt_step], available_amounts=self.available_amounts_array[bt_step, :], trade_prices=self.trade_price_data[bt_step, :], cost_params=self.cost_params, cash_delivery_queue=cash_delivery_queue, stock_delivery_queue=stock_delivery_queue, share_count=self.share_count, **self.signal_parsing_params, **self.trading_moq_params, **self.trading_delivery_params, ) bt_step += 1 # # 4,更新operator中的依赖性历史数据,主要是trade_prices_array数据(成交价格数据,因为这个价格需要计算出来) self.trade_price_array[:] = np.abs(np.sign(self.trade_records_array)) * self.trade_price_data time = timer.get_stats() self.op_run_time = time['op_run'] self.backtest_run_time = time['backtest'] # 5,返回signals,因为完整的回测结果清单已经保存在作为参数传入的几个数组中 return signals # 生成回测结果的各种评价指标,直接快速计算返回回测的各项结果指标
[文档] def trade_result_final_value(self): """ 直接快速计算返回回测的终值结果""" final_value = (self.trade_price_data * self.own_amounts_array[1:]).sum(axis=1) + self.own_cashes[1:] return final_value[-1]
[文档] def trade_result_volatility(self): """ 直接快速计算返回回测的波动率结果""" value_history = (self.trade_price_data * self.own_amounts_array[1:]).sum(axis=1) + self.own_cashes[1:] rolled_value_history = np.roll(value_history, 1) returns = (value_history - rolled_value_history) / rolled_value_history returns[0] = 0.0 # 第一天的收益率设为0.0 volatility = returns.std() * np.sqrt(252) return volatility
[文档] def trade_result_max_drawdown(self): """ 直接快速计算返回回测的最大回撤结果""" value_history = (self.trade_price_data * self.own_amounts_array[1:]).sum(axis=1) + self.own_cashes[1:] rolling_max = np.maximum.accumulate(value_history) drawdown = (value_history - rolling_max) / rolling_max max_drawdown = drawdown.min() return max_drawdown
# 生成更加结构化的DataFrame型交易结果数据,以便用于结果的评价及后续处理,
[文档] def trade_result_df(self) -> pd.DataFrame: """ 根据回测结果生成资产价值记录,输出内容为DataFrame格式 Returns ------- value_history: pd.DataFrame 交易模拟结果数据 """ if self.evaluate_price_data is None: value_history = pd.DataFrame(self.own_amounts_array[1:], index=self.op.op_signal_index.get_level_values(0), columns=self.shares) value_history['cash'] = self.own_cashes[1:] value_history['value'] = (self.trade_price_data * self.own_amounts_array[1:]).sum(axis=1) + self.own_cashes[1:] value_history['fee'] = self.trade_cost_array.sum(axis=1) value_history = value_history.groupby(value_history.index).last() return value_history daily_index = self.evaluate_price_data.index step_times = pd.to_datetime(self.op.op_signal_index.get_level_values(0)) step_times_values = step_times.to_numpy() daily_values_ts = daily_index.to_numpy() # 对于第一个交易信号之前的日期,使用初始持仓(索引0); # 对于两个交易信号之间的日期,使用最近一次交易后的持仓; # 对于最后一个交易信号之后的日期,使用最后一次交易后的持仓。 pos_idx = np.searchsorted(step_times_values, daily_values_ts, side='right') pos_idx = np.clip(pos_idx, 0, self.own_amounts_array.shape[0] - 1) daily_positions = self.own_amounts_array[pos_idx, :] daily_cash = self.own_cashes[pos_idx] # 将每一步交易费用按“日期”聚合为“每日总费用”,再与日频索引按日期对齐 step_dates = step_times.normalize() step_fee = self.trade_cost_array.sum(axis=1) fee_by_date = pd.Series(step_fee, index=step_dates).groupby(level=0).sum() daily_dates = daily_index.normalize() daily_fee = fee_by_date.reindex(daily_dates).fillna(0.0) # 使用评价用日频收盘价构造日频价格序列 daily_prices = self.evaluate_price_data.reindex(daily_index).reindex(columns=self.shares) price_array = np.nan_to_num(daily_prices.values, nan=0.0) daily_values = (price_array * daily_positions).sum(axis=1) + daily_cash # 一次性拼接持仓列与价格列,避免逐列插入导致 DataFrame 碎片化 positions_df = pd.DataFrame(daily_positions, index=daily_index, columns=self.shares) price_df = daily_prices.reindex(columns=self.shares).rename(columns=lambda c: 'p-' + c) value_history = pd.concat([positions_df, price_df], axis=1) value_history['cash'] = daily_cash value_history['value'] = daily_values value_history['fee'] = daily_fee.values return value_history
[文档] def trace_result_df(self) -> Optional[DataFrame]: """ 根据回测结果生成交易过程记录,输出内容为DataFrame格式 trace 行与 op_signal_index 的对齐由 Operator 在写入时保证(update_trace_step 使用 全局 signal 行号),此处仅按策略 concat 后设置 index 为 op_signal_index。 Returns ------- trade_trace: pd.DataFrame 交易模拟过程数据 """ if not self.enable_tracing: return self.trace_df op_signal_index = self.op.op_signal_index trace_dfs = [s.get_trace_data() for s in self.op.strategies] if not trace_dfs: self.trace_df = pd.DataFrame(index=op_signal_index) return self.trace_df trade_trace = pd.concat(trace_dfs, axis=1) trade_trace.index = op_signal_index self.trace_df = trade_trace return self.trace_df
[文档] def evaluate_result(self, indicators: str) -> dict: """生成交易结果的评价报告,保存在self.evaluate_result属性中 Parameters ---------- indicators: str 回测结果评价指标,详情参见qteasy.evaluate.evaluate()函数 Returns ------- """ self.backtest_result.update( evaluate( looped_values=self.trade_result_df(), hist_benchmark=self.benchmark_data, cash_plan=self.cash_plan, indicators=indicators, ) ) self.backtest_result['op_run_time'] = self.op_run_time self.backtest_result['loop_run_time'] = self.backtest_run_time return self.backtest_result
# 生成回测结果的明细报告,包括纯文本形式的报告和图表形式的报告
[文档] def report_result(self, trade_log: str = None, trade_summary: str = None) -> str: """ 生成回测结果的明细报告,报告为纯文本格式,可以使用print命令打印 Parameters ---------- trade_log: str, optional 交易日志文件的存储路径,默认值为None,如果给出该路径,则在报告中打印交易日志的存储路径 trade_summary: str, optional 交易汇总记录文件的存储路径,默认值为None,如果给出,则在报告中打印交易汇总记录的存储路径 Returns ------- report_str: str 以打印格式排版的回测结果报告 """ if self.backtest_result.get('complete_values') is not None: self.backtest_result['report'] = _loop_report_str( loop_results=self.backtest_result, ) return self.backtest_result['report'] else: return 'Complete evaluation of backtest result is not created!'
[文档] def plot_result(self, plot_title: str, show_positions: bool, buy_sell_markers: bool) -> None: """ 以图表形式生成交易结果 Parameters ---------- plot_title: str 图表的标题名称 show_positions: bool 是否显示持股仓位区间信息,如果设置为True,则在收益率曲线图上 以红色/绿色条带显示区间的持仓类型(绿色表示持多仓,红色表示持 空仓)颜色越深持仓比例越高 buy_sell_markers: bool 是否在收益率曲线图上显示买卖点,如果设置为True,则在收益率曲 线图上以红绿色小箭头标示出买卖点 Returns ------- None """ if self.backtest_result.get('complete_values') is not None: _plot_loop_result( loop_results=self.backtest_result, plot_title=plot_title, show_positions=show_positions, buy_sell_markers=buy_sell_markers, ) else: err = RuntimeError('Complete evaluation of backtest result is not created!') raise err
# 根据回测结果生成交易日志,包含更加完整的交易记录,输出内容为DataFrame格式,并且可以保存为csv文件
[文档] def generate_trade_logs( self, save_to_file_path: Union[str, None] = None, ) -> DataFrame: """根据回测结果生成交易日志,交易日志是一份完整的交易记录文件,包含每一个交易期间的下列信息。 每一个交易期间包含 8 行数据,分别为: - ``0, trade signal``:每一支股票的当期交易信号 - ``1, price``:每一支股票的当期交易价格 - ``2, traded amounts``:每一支股票的当期交易数量,如果没有交易则为 0 - ``3, cash changed``:每一支股票的当期现金变动金额,买入为负数,卖出为正数 - ``4, trade cost``:每一支股票的当期交易费用 - ``5, own amounts``:每一支股票的当期末持有数量 - ``6, available amounts``:每一支股票的当期末可用数量 - ``7, summary``:当期每一支股票的持仓价值,同时包含汇总数据(当期末持有现金、可用现金、总资产价值) 以上信息以 DataFrame 形式保存,行索引为多级索引,第一/二层为时间/策略组索引,第三层为上述 8 个数据类别。 交易日志文件可以被保存为 csv 格式,文件名为 ``trade_log.csv``。 Parameters ---------- save_to_file_path: str, optional 如果提供了文件路径,则将交易日志保存为CSV文件,默认值为None Returns ------- trade_log: pd.DataFrame 交易模拟结果数据 """ if self.logger: # create share trading logs: self.logger.info(f'generating detailed trading log ...') if self.share_count == 0: raise ValueError('shares list is empty, cannot create trade logs!') # 生成 trade log 详细表的股票持仓变化详情部分 (每支股票每期的交易信号、价格、交易数量、交易费用、期末持有数量、期末可用数量、持仓价值等) op_signal_index = self.op.op_signal_index trade_signal_df = pd.DataFrame(self.op_signals, index=op_signal_index, columns=self.shares) trade_price_df = pd.DataFrame(np.round(self.trade_price_data, 3), index=op_signal_index, columns=self.shares) own_amounts_df = pd.DataFrame(np.round(self.own_amounts_array[1:], 3), index=op_signal_index, columns=self.shares) available_amounts_df = pd.DataFrame(np.round(self.available_amounts_array[1:], 3), index=op_signal_index, columns=self.shares) trade_records_df = pd.DataFrame(np.round(self.trade_records_array, 3), index=op_signal_index, columns=self.shares) trade_cost_df = pd.DataFrame(np.round(self.trade_cost_array, 3), index=op_signal_index, columns=self.shares) cash_changed_df = pd.DataFrame(np.round(-trade_price_df * trade_records_df - self.trade_cost_array, 3), index=op_signal_index, columns=self.shares) amounts_value_df = pd.DataFrame(np.round(trade_price_df * self.own_amounts_array[1:], 3), index=op_signal_index, columns=self.shares) combined_data = pd.concat( objs=[trade_signal_df, trade_price_df, trade_records_df, cash_changed_df, trade_cost_df, own_amounts_df, available_amounts_df, amounts_value_df, ], keys=['0, trade signal', '1, price', '2, traded amounts', '3, cash changed', '4, trade cost', '5, own amounts', '6, available amounts', '7, summary'], ) combined_data = combined_data.reorder_levels([1, 2, 0]).sort_index(level=0) # 生成 trade log 详细表的每期汇总数据部分(当期现金投入、期末持有现金、期末可用现金、期末总价值) add_investments = pd.Series(self.cash_investment_array, index=op_signal_index, name='add. invest') own_cash_series = pd.Series(np.round(self.own_cashes[1:], 3), index=op_signal_index, name='own cash') available_cash_series = pd.Series(np.round(self.available_cashes[1:], 3), index=op_signal_index, name='available cash') total_values = (self.trade_price_data * self.own_amounts_array[1:]).sum(axis=1) + self.own_cashes[1:] total_value_series = pd.Series(np.round(total_values, 3), index=op_signal_index, name='value') summary_data = [add_investments, own_cash_series, available_cash_series, total_value_series] if self.enable_tracing: trace_df = self.trace_result_df() if trace_df is not None: summary_data.append(trace_df) self.summary_df = pd.concat( objs=summary_data, axis=1, ) self.summary_df = self.summary_df.assign(summary='7, summary').set_index('summary', append=True) self.summary_df.index.names = [None, None, None] # 上面将summary_df的索引变为多级索引,第三层为'7, summary',与combined_data的第三层索引对应,以便join self.trade_log_df = self.summary_df.join(combined_data, how='outer', sort=False) if save_to_file_path is not None: self.trade_log_df.to_csv(save_to_file_path, encoding='utf-8') if self.logger: self.logger.info(f'trade log saved to {save_to_file_path}') self.backtest_result['trade_log'] = save_to_file_path # return self.trade_log_df
# 根据回测结果生成交易汇总表,输出内容为DataFrame格式,并且可以保存为csv文件
[文档] def generate_trade_summary( self, share_names: Union[list[str], None] = None, save_to_file_path: Union[str, None] = None, ) -> pd.DataFrame: """ 生成 trade summary 交易摘要表 (一个更加紧凑的交易汇总表,包含每次交易的关键信息, 以一种更加易于人类阅读的方式呈现,并过滤掉无交易的记录),函数的输入trade_log_df是函数 generate_trade_logs()的返回值。 Parameters ---------- share_names: list[str], optional 交易标的名称列表, 如果为None,则使用“N/A”作为名称 save_to_file_path: str, optional 如果提供了文件路径,则将交易摘要表保存为CSV文件,默认值为None,不保存文件 """ if self.logger is not None: # create share trading logs: self.logger.info(f'generating abstract trading log ...') if self.share_count == 0: raise ValueError('shares list is empty, cannot create trade summary!') if any(share not in self.trade_log_df.columns for share in self.shares): missing_share = [share for share in self.shares if share not in self.trade_log_df.columns] raise KeyError( f'some shares ({missing_share}) are not in trade_log_df columns, cannot create trade summary!') # 处理share_names if share_names is None: share_names = ['N/A' for _ in self.shares] share_logs = [] # trade_log_df_no_duplicate = self.trade_log_df[~self.trade_log_df.index.duplicated(keep='last')] for share, share_name in zip(self.shares, share_names): share_df = self.trade_log_df[share].unstack() share_df = share_df[share_df['2, traded amounts'] != 0] share_df['code'] = share share_df['name'] = share_name share_logs.append(share_df) re_columns = ['code', 'name', '0, trade signal', '1, price', '2, traded amounts', '3, cash changed', '4, trade cost', '5, own amounts', '6, available amounts', '7, summary'] op_log_shares_abs = pd.concat(share_logs).reindex(columns=re_columns) self.summary_df.index = self.summary_df.index.droplevel(-1) # 去掉index中的’7, summary‘层级以便join self.summary_df = self.summary_df.join(op_log_shares_abs, how='right', sort=True) if save_to_file_path is not None: self.summary_df.to_csv(save_to_file_path, encoding='utf-8') if self.logger is not None: self.logger.info(f'trade summary saved to {save_to_file_path}') self.backtest_result['trade_summary'] = save_to_file_path # return self.summary_df
[文档] def save_complete_values(self, save_to_file_path: Optional[str] = None) -> Optional[str]: """ 将 complete_values 保存为 CSV 文件(当 trade_log=True 时由 qt_operator 调用) Parameters ---------- save_to_file_path: str, optional 保存路径,若为 None 则不写入 Returns ------- str or None 成功时返回保存路径,否则返回 None """ cv = self.backtest_result.get('complete_values') if save_to_file_path is None or cv is None or (isinstance(cv, pd.DataFrame) and cv.empty): if self.logger: self.logger.debug( 'save_complete_values skipped: path=%s, complete_values present=%s', save_to_file_path, cv is not None and (not isinstance(cv, pd.DataFrame) or not cv.empty), ) return None cv.to_csv(save_to_file_path, encoding='utf-8') self.backtest_result['complete_values_file'] = save_to_file_path if self.logger: self.logger.info(f'complete values (value curve) saved to {save_to_file_path}') return save_to_file_path