跳至内容

生产者 / 消费者模式

freqtrade 提供了一种机制,其中一个实例(也称为 消费者)可以通过消息 websocket 监听上游 freqtrade 实例(也称为 生产者)发送的消息。主要消息类型包括 analyzed_dfwhitelist。这使得多个机器人可以重复使用已计算的指标(和信号),而无需多次重复计算。

请参阅 Rest API 文档中的 消息 Websocket,了解如何为你的消息 websocket 配置 api_server(这将是你的生产者)。

注意

我们强烈建议将 ws_token 设置为只有你自己知道的随机字符串,以避免他人未经授权访问你的机器人。

配置

通过在消费者的配置文件中添加 external_message_consumer 部分来启用订阅功能。

{
    //...
   "external_message_consumer": {
        "enabled": true,
        "producers": [
            {
                "name": "default", // This can be any name you'd like, default is "default"
                "host": "127.0.0.1", // The host from your producer's api_server config
                "port": 8080, // The port from your producer's api_server config
                "secure": false, // Use a secure websockets connection, default false
                "ws_token": "sercet_Ws_t0ken" // The ws_token from your producer's api_server config
            }
        ],
        // The following configurations are optional, and usually not required
        // "wait_timeout": 300,
        // "ping_timeout": 10,
        // "sleep_time": 10,
        // "remove_entry_exit_signals": false,
        // "message_size_limit": 8
    }
    //...
}
参数 描述
enabled 必填。 启用消费者模式。如果设置为 false,则此部分中的所有其他设置都将被忽略。
默认值为 false
数据类型: 布尔值。
producers 必填。 生产者列表
数据类型: 数组。
producers.name 必填。 此生产者的名称。当使用多个生产者时,调用 get_producer_pairs()get_producer_df() 必须使用此名称。
数据类型: 字符串
producers.host 必填。 生产者的主机名或 IP 地址。
数据类型: 字符串
producers.port 必填。 与上述主机匹配的端口。
默认值为 8080
数据类型: 整数
producers.secure 可选。 在 websocket 连接中使用 SSL。默认为 False。
数据类型: 字符串
producers.ws_token 必填。 在生产者上配置的 ws_token
数据类型: 字符串
可选设置
wait_timeout 在未收到消息的情况下,等待多长时间后再次发送 ping。
默认值为 300
数据类型: 整数 - 单位为秒。
ping_timeout ping 超时时间
默认值为 10
数据类型: 整数 - 单位为秒。
sleep_time 重新尝试连接前的休眠时间。
默认值为 10
数据类型:整数 - 单位为秒。
remove_entry_exit_signals 在接收到数据框时,移除其中的信号列(将其设为 0)。
默认值为 false
数据类型:布尔值。
initial_candle_limit 从生产者处最初期望获取的 K 线数量。
默认值为 1500
数据类型:整数 - K 线数量。
message_size_limit 每条消息的大小限制
默认值为 8
数据类型:整数 - 兆字节(MB)。

关注者实例无需(或除了)在 populate_indicators() 中计算指标,而是监听来自生产者实例的消息(或在高级配置中多个生产者实例),并请求生产者为当前白名单中每个交易对提供的最新分析后的数据框。

这样一来,消费者实例将完整复制已分析的数据框,而无需自行计算。

示例

示例 - 生产者策略

一个包含多个指标的简单策略。该策略本身无需特殊处理。

class ProducerStrategy(IStrategy):
    #...
    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Calculate indicators in the standard freqtrade way which can then be broadcast to other instances
        """
        dataframe['rsi'] = ta.RSI(dataframe)
        bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
        dataframe['bb_lowerband'] = bollinger['lower']
        dataframe['bb_middleband'] = bollinger['mid']
        dataframe['bb_upperband'] = bollinger['upper']
        dataframe['tema'] = ta.TEMA(dataframe, timeperiod=9)

        return dataframe

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Populates the entry signal for the given dataframe
        """
        dataframe.loc[
            (
                (qtpylib.crossed_above(dataframe['rsi'], self.buy_rsi.value)) &
                (dataframe['tema'] <= dataframe['bb_middleband']) &
                (dataframe['tema'] > dataframe['tema'].shift(1)) &
                (dataframe['volume'] > 0)
            ),
            'enter_long'] = 1

        return dataframe

FreqAI

你可以使用此方法在高性能机器上设置 FreqAI,同时在树莓派等简单设备上运行消费者实例,这些设备可以以不同方式解释生产者生成的信号。

示例 - 消费者策略

一个逻辑上等效的策略,其本身不进行任何指标计算,但可以获得与生产者计算出的相同分析数据框,从而基于这些指标做出交易决策。本示例中消费者的进出仓条件与生产者相同,但这并非必需。消费者可以使用不同的逻辑来开仓或平仓,仅按需使用指定的指标。

class ConsumerStrategy(IStrategy):
    #...
    process_only_new_candles = False # required for consumers

    _columns_to_expect = ['rsi_default', 'tema_default', 'bb_middleband_default']

    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Use the websocket api to get pre-populated indicators from another freqtrade instance.
        Use `self.dp.get_producer_df(pair)` to get the dataframe
        """
        pair = metadata['pair']
        timeframe = self.timeframe

        producer_pairs = self.dp.get_producer_pairs()
        # You can specify which producer to get pairs from via:
        # self.dp.get_producer_pairs("my_other_producer")

        # This func returns the analyzed dataframe, and when it was analyzed
        producer_dataframe, _ = self.dp.get_producer_df(pair)
        # You can get other data if the producer makes it available:
        # self.dp.get_producer_df(
        #   pair,
        #   timeframe="1h",
        #   candle_type=CandleType.SPOT,
        #   producer_name="my_other_producer"
        # )

        if not producer_dataframe.empty:
            # If you plan on passing the producer's entry/exit signal directly,
            # specify ffill=False or it will have unintended results
            merged_dataframe = merge_informative_pair(dataframe, producer_dataframe,
                                                      timeframe, timeframe,
                                                      append_timeframe=False,
                                                      suffix="default")
            return merged_dataframe
        else:
            dataframe[self._columns_to_expect] = 0

        return dataframe

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Populates the entry signal for the given dataframe
        """
        # Use the dataframe columns as if we calculated them ourselves
        dataframe.loc[
            (
                (qtpylib.crossed_above(dataframe['rsi_default'], self.buy_rsi.value)) &
                (dataframe['tema_default'] <= dataframe['bb_middleband_default']) &
                (dataframe['tema_default'] > dataframe['tema_default'].shift(1)) &
                (dataframe['volume'] > 0)
            ),
            'enter_long'] = 1

        return dataframe

使用上游信号

通过设置 remove_entry_exit_signals=false,你也可以直接使用生产者的信号。这些信号将作为 enter_long_default 可用(假设使用了 suffix="default"),既可直接作为交易信号,也可作为附加指标使用。