跳至内容

高级策略

本页解释了策略中可用的一些高级概念。如果您刚开始使用,请先熟悉 Freqtrade 基础知识策略自定义 中描述的方法。

此处所述方法的调用顺序在 机器人执行逻辑 中有详细说明。这些文档也有助于确定哪种方法最适合您的自定义需求。

注意

回调方法应在策略使用时才实现。

提示

通过运行 freqtrade new-strategy --strategy MyAwesomeStrategy --template advanced,可以从包含所有可用回调方法的策略模板开始。

存储信息(持久化)

Freqtrade 允许在数据库中存储/检索与特定交易相关联的用户自定义信息。

通过交易对象,可以使用 trade.set_custom_data(key='my_key', value=my_value) 存储信息,并使用 trade.get_custom_data(key='my_key') 检索信息。每个数据条目都与一笔交易以及用户提供的键(类型为 string)相关联。这意味着这只能在也提供交易对象的回调中使用。

为了使数据能够存储在数据库中,Freqtrade 必须对数据进行序列化。这是通过将数据转换为 JSON 格式的字符串来完成的。Freqtrade 在检索时会尝试逆转此操作,因此从策略的角度来看,这一点并不重要。

from freqtrade.persistence import Trade
from datetime import timedelta

class AwesomeStrategy(IStrategy):

    def bot_loop_start(self, **kwargs) -> None:
        for trade in Trade.get_open_order_trades():
            fills = trade.select_filled_orders(trade.entry_side)
            if trade.pair == 'ETH/USDT':
                trade_entry_type = trade.get_custom_data(key='entry_type')
                if trade_entry_type is None:
                    trade_entry_type = 'breakout' if 'entry_1' in trade.enter_tag else 'dip'
                elif fills > 1:
                    trade_entry_type = 'buy_up'
                trade.set_custom_data(key='entry_type', value=trade_entry_type)
        return super().bot_loop_start(**kwargs)

    def adjust_entry_price(self, trade: Trade, order: Order | None, pair: str,
                           current_time: datetime, proposed_rate: float, current_order_rate: float,
                           entry_tag: str | None, side: str, **kwargs) -> float:
        # Limit orders to use and follow SMA200 as price target for the first 10 minutes since entry trigger for BTC/USDT pair.
        if (
            pair == 'BTC/USDT' 
            and entry_tag == 'long_sma200' 
            and side == 'long' 
            and (current_time - timedelta(minutes=10)) > trade.open_date_utc 
            and order.filled == 0.0
        ):
            dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
            current_candle = dataframe.iloc[-1].squeeze()
            # store information about entry adjustment
            existing_count = trade.get_custom_data('num_entry_adjustments', default=0)
            if not existing_count:
                existing_count = 1
            else:
                existing_count += 1
            trade.set_custom_data(key='num_entry_adjustments', value=existing_count)

            # adjust order price
            return current_candle['sma_200']

        # default: maintain existing order
        return current_order_rate

    def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs):

        entry_adjustment_count = trade.get_custom_data(key='num_entry_adjustments')
        trade_entry_type = trade.get_custom_data(key='entry_type')
        if entry_adjustment_count is None:
            if current_profit > 0.01 and (current_time - timedelta(minutes=100) > trade.open_date_utc):
                return True, 'exit_1'
        else
            if entry_adjustment_count > 0 and if current_profit > 0.05:
                return True, 'exit_2'
            if trade_entry_type == 'breakout' and current_profit > 0.1:
                return True, 'exit_3

        return False, None

以上是一个简单的例子——有更简单的方法可以获取交易数据,例如入口调整。

注意

建议使用简单的数据类型 [bool, int, float, str],以确保在序列化需要存储的数据时不会出现问题。存储大量数据可能会导致意外的副作用,例如数据库变得过大(从而变慢)。

不可序列化的数据

如果提供的数据无法被序列化,则会记录一条警告,且指定 key 对应的条目将包含 None 作为数据。

所有属性

custom-data 可通过交易对象(下文假设为 trade)提供以下访问器:

  • trade.get_custom_data(key='something', default=0) - 返回所提供类型的实际值。
  • trade.get_custom_data_entry(key='something') - 返回整个条目(包括元数据)。可通过 .value 属性访问值。
  • trade.set_custom_data(key='something', value={'some': 'value'}) - 设置或更新该交易对应的键。值必须是可序列化的——我们建议保持存储的数据相对较小。

"value" 可以是任意类型(设置和获取时均可),但必须是 JSON 可序列化的。

存储信息(非持久化)

已弃用

此信息存储方法已被弃用,我们不建议使用非持久化存储。
请改用持久化存储

因此其内容已被折叠。

存储信息

可以通过在策略类中创建一个新的字典来实现信息存储。

变量名可自行选择,但应以 custom_ 为前缀,以避免与预定义的策略变量发生命名冲突。

class AwesomeStrategy(IStrategy):
    # Create custom dictionary
    custom_info = {}

    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        # Check if the entry already exists
        if not metadata["pair"] in self.custom_info:
            # Create empty entry for this pair
            self.custom_info[metadata["pair"]] = {}

        if "crosstime" in self.custom_info[metadata["pair"]]:
            self.custom_info[metadata["pair"]]["crosstime"] += 1
        else:
            self.custom_info[metadata["pair"]]["crosstime"] = 1

警告

数据在机器人重启(或配置重载)后不会保留。此外,应尽量保持数据量较小(不要存储 DataFrame 等大型对象),否则机器人将消耗大量内存,最终可能导致内存耗尽并崩溃。

注意

如果数据是特定交易对的,请确保在字典中使用交易对作为其中一个键。

访问数据帧

您可以在各种策略函数中通过数据提供器(dataprovider)查询来访问数据帧。

from freqtrade.exchange import timeframe_to_prev_date

class AwesomeStrategy(IStrategy):
    def confirm_trade_exit(self, pair: str, trade: 'Trade', order_type: str, amount: float,
                           rate: float, time_in_force: str, exit_reason: str,
                           current_time: 'datetime', **kwargs) -> bool:
        # Obtain pair dataframe.
        dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)

        # Obtain last available candle. Do not use current_time to look up latest candle, because 
        # current_time points to current incomplete candle whose data is not available.
        last_candle = dataframe.iloc[-1].squeeze()
        # <...>

        # In dry/live runs trade open date will not match candle open date therefore it must be 
        # rounded.
        trade_date = timeframe_to_prev_date(self.timeframe, trade.open_date_utc)
        # Look up trade candle.
        trade_candle = dataframe.loc[dataframe['date'] == trade_date]
        # trade_candle may be empty for trades that just opened as it is still incomplete.
        if not trade_candle.empty:
            trade_candle = trade_candle.squeeze()
            # <...>

使用 .iloc[-1]

您可以在此处使用 .iloc[-1],因为 get_analyzed_dataframe() 仅返回回测允许查看的 K 线数据。但在 populate_* 方法中无法使用,因此请确保不要在这些区域使用 .iloc[]。此外,此功能仅从版本 2021.5 开始支持。


进入标签

当您的策略有多个入场信号时,可以为触发的信号命名。然后您可以在 custom_exit 中访问该入场信号。

def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    dataframe["enter_tag"] = ""
    signal_rsi = (qtpylib.crossed_above(dataframe["rsi"], 35))
    signal_bblower = (dataframe["bb_lowerband"] < dataframe["close"])
    # Additional conditions
    dataframe.loc[
        (
            signal_rsi
            | signal_bblower
            # ... additional signals to enter a long position
        )
        & (dataframe["volume"] > 0)
            , "enter_long"
        ] = 1
    # Concatenate the tags so all signals are kept
    dataframe.loc[signal_rsi, "enter_tag"] += "long_signal_rsi "
    dataframe.loc[signal_bblower, "enter_tag"] += "long_signal_bblower "

    return dataframe

def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float,
                current_profit: float, **kwargs):
    dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
    last_candle = dataframe.iloc[-1].squeeze()
    if "long_signal_rsi" in trade.enter_tag and last_candle["rsi"] > 80:
        return "exit_signal_rsi"
    if "long_signal_bblower" in trade.enter_tag and last_candle["high"] > last_candle["bb_upperband"]:
        return "exit_signal_bblower"
    # ...
    return None

注意

enter_tag 限制为 255 个字符,超出部分将被截断。

警告

只有一个 enter_tag 列,该列同时用于多头和空头交易。因此,该列的行为应视为“最后写入优先”(毕竟它只是一个数据帧列)。在复杂情况下,若多个信号发生冲突(或信号因不同条件被再次关闭),可能导致错误的标签被应用到入场信号上。这些结果是由于策略覆盖了先前的标签——最后一个写入的标签将“保留”,并被 freqtrade 所采用。

退出标签

类似于入场标签,您也可以指定一个退出标签。

def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    dataframe["exit_tag"] = ""
    rsi_exit_signal = (dataframe["rsi"] > 70)
    ema_exit_signal  = (dataframe["ema20"] < dataframe["ema50"])
    # Additional conditions
    dataframe.loc[
        (
            rsi_exit_signal
            | ema_exit_signal
            # ... additional signals to exit a long position
        ) &
        (dataframe["volume"] > 0)
        ,
    "exit_long"] = 1
    # Concatenate the tags so all signals are kept
    dataframe.loc[rsi_exit_signal, "exit_tag"] += "exit_signal_rsi "
    dataframe.loc[rsi_exit_signal2, "exit_tag"] += "exit_signal_rsi "

    return dataframe

提供的退出标签将作为退出原因,并在回测结果中以此显示。

注意

exit_reason 限制为 100 个字符,超出部分将被截断。

策略版本

您可以通过使用 "version" 方法并返回您希望策略拥有的版本号,来实现自定义的策略版本控制。

def version(self) -> str:
    """
    Returns version of the strategy.
    """
    return "1.1"

注意

您应确保同时实施适当的版本控制(例如使用 git 仓库),因为 freqtrade 不会保留您策略的历史版本,因此能否回滚到之前的策略版本完全取决于用户自己。

派生策略

策略可以从其他策略派生,从而避免重复编写自定义策略代码。您可以使用此技术来覆盖主策略中的小部分逻辑,同时保持其余部分不变:

user_data/strategies/myawesomestrategy.py
class MyAwesomeStrategy(IStrategy):
    ...
    stoploss = 0.13
    trailing_stop = False
    # All other attributes and methods are here as they
    # should be in any custom strategy...
    ...
user_data/strategies/MyAwesomeStrategy2.py
from myawesomestrategy import MyAwesomeStrategy
class MyAwesomeStrategy2(MyAwesomeStrategy):
    # Override something
    stoploss = 0.08
    trailing_stop = True

属性和方法均可被重写,从而按需改变原始策略的行为。

虽然从技术上讲可以将子类保留在同一个文件中,但这可能会导致 hyperopt 参数文件出现一些问题。因此,我们建议使用单独的策略文件,并像上面所示那样导入父策略。

嵌入策略

Freqtrade 提供了一种简单的方法,可将策略嵌入到配置文件中。这通过使用 BASE64 编码实现,并在所选配置文件的策略配置字段中提供该字符串。

将字符串编码为 BASE64

以下是一个快速示例,展示如何在 Python 中生成 BASE64 字符串

from base64 import urlsafe_b64encode

with open(file, 'r') as f:
    content = f.read()
content = urlsafe_b64encode(content.encode('utf-8'))

变量 'content' 将包含以 BASE64 编码形式表示的策略文件。现在可以在配置文件中按如下方式设置

"strategy": "NameOfStrategy:BASE64String"

请确保 'NameOfStrategy' 与策略名称完全一致!

性能警告

执行策略时,日志中有时会出现以下内容

PerformanceWarning: DataFrame 高度碎片化。

这是来自 pandas 的警告,正如警告中提到的:应使用 pd.concat(axis=1)。这可能会带来轻微的性能影响,通常仅在进行 hyperopt(优化指标时)才会明显察觉。

例如:

for val in self.buy_ema_short.range:
    dataframe[f'ema_short_{val}'] = ta.EMA(dataframe, timeperiod=val)

应重写为

frames = [dataframe]
for val in self.buy_ema_short.range:
    frames.append(DataFrame({
        f'ema_short_{val}': ta.EMA(dataframe, timeperiod=val)
    }))

# Combine all dataframes, and reassign the original dataframe column
dataframe = pd.concat(frames, axis=1)

不过,Freqtrade 也会在 populate_indicators() 方法执行后立即对 DataFrame 调用 dataframe.copy() 来缓解此问题,因此该问题的性能影响通常很低甚至不存在。