编写一个清算狙击手 V2 策略控制器¶

作者:Patrick Meier
在本文中,我们将探讨如何为 Hummingbot 创建一个自定义的 V2 控制器,以在 Binance 上狙击期货市场的清算事件。我们将使用一个通用控制器来演示该策略的正确用法和实现方式。
清算狙击手策略¶
该策略旨在利用加密货币期货市场中定期发生的清算事件。当这些事件发生时,市场价格往往会出现快速反弹,本策略通过多层加仓(DCA)的方式捕捉这一反弹机会。
该交易策略的目标是抓住加密货币期货市场中清算事件后常见的快速价格反弹。这些清算通常由杠杆交易者被强制平仓引发,导致价格剧烈波动,暂时打破市场平衡。
示例¶
在这张 BTC/USDT 的 30 分钟 K 线图中,红色蜡烛图所示的急剧下跌与多头清算激增同时发生,表明大量杠杆多头仓位被强制关闭。紧随其后的是明显的价格回升,显示市场吸收了清算影响后价格向上修正的反弹过程。

编写策略¶
为了提高可读性,我在此有意省略了导入部分。你可以在此下载完整源代码:liquidations_sniper.py
控制流程¶
让我们快速了解即将构建的控制器的高层控制流程。

- 我们监控 Binance 上发生的清算事件,并在指定的秒数内对其进行汇总
- 如果在指定时间间隔内达到了以报价货币(通常为 USDⓈ-M)计的特定金额,并且当前没有活跃仓位,我们将开仓一笔 DCA 交易
定义配置¶
首先我们需要定义用户可配置的变量。这些变量被分组在一起以便更清晰地展示...
强平配置¶
- trading_connector:下单所使用的连接器(默认值:kucoin_perpetual)
- trading_pair:在所选交易所用于交易的交易对(默认值:XBT-USDT)
- liquidation_side:你希望交易的方向,即做多或做空强平(默认值:LONG,有效值:LONG / SHORT)- 例如,交易多头强平意味着价格正在下跌,而过度加杠杆的多头仓位被强制平仓。当这种情况发生时(在给定时间间隔内触发金额达到设定值),我们将下达 DCA 订单以交易价格反弹
 
- liquidations_pair:在 Binance 上监控强平的交易对(默认值:BTC-USDT)- 重要提示:该策略假设处理的是 USDⓈ-M 永续合约
 
- liquidations_interval_seconds:累计强平金额的时间长度(单位:秒)(默认值:15)
- liquidations_trigger_usd_amount:在强平时间间隔内达到的以美元计价的强平总额,用于触发实际交易(默认值:10,000,000)
DCA 配置¶
- total_amount_quote:用于交易的报价资产总金额(默认值:100)- 通常为美元
 
- dca_levels_percent:每个定投(DCA)层级的百分比值列表,以小数表示,例如 0.01 表示 1%(默认值:0.01, 0.02, 0.03, 0.05)
- dca_amounts_percent:每个 DCA 层级占总报价金额的百分比值列表,以小数表示,例如 0.1 表示 10%(默认值:0.1, 0.2, 0.3, 0.4)- 总和应为 1 = 100%,如果不是,则会将其总和视为 100%
 
- stop_loss:止损比例(默认值:0.03)
- take_profit:止盈比例(默认值:0.01)
- time_limit:时间限制(单位:秒)(默认值:1800 ⇒ 30 分钟)
永续合约配置¶
- leverage:交易时使用的杠杆倍数(默认值:5)。若为现货交易,请设置为 1
- position_mode:仓位模式(默认值:HEDGE,有效值:HEDGE / ONEWAY)
编写配置类代码¶
我们自定义控制器的配置类继承自 ControllerConfigBase,这是一个 Pydantic 模型,提供诸如 controller_name 等标准属性。然后我们使用 Pydantic 来定义字段,附带内置验证功能以及描述信息(这些描述会在 Hummingbot CLI 中显示给用户)。最后,我们添加自定义验证器,以确保配置的健壮性并防止潜在的运行时错误。
...
class LiquidationSniperConfig(ControllerConfigBase):
    """
    This controller executes a strategy that listens for liquidations on Binance for the given pair
    and executes a DCA trade to profit from the rebound. The profitability is highly dependent on the 
    settings you make. 
    Docs: https://www.notion.so/hummingbot-foundation/Liquidation-Sniper-V2-Framework-739dadb04eac4aa6a082067e06ddf7db
    """
    controller_name: str = "liquidations_sniper"
    candles_config: List[CandlesConfig] = []  # do not need any candles for that
    # ---------------------------------------------------------------------------------------
    # Liquidations Config
    # ---------------------------------------------------------------------------------------
    trading_connector: str = Field(
        default="kucoin_perpetual",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the trading connector (where the execution of the order shall take place): ",
            prompt_on_new=True
        ))
    trading_pair: str = Field(
        default="XBT-USDT",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the trading pair which you want to use for trading: ",
            prompt_on_new=True
        ))
    liquidation_side: LiquidationSide = Field(
        default="LONG",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter which liquidations you want to trade on (SHORT/LONG). Trading long liquidations "
                               "means price is going down and over-leveraged long positions get forcefully liquidated. "
                               "The strategy would then DCA-Buy into that liquidation and waiting for the rebound: ",
            prompt_on_new=True
        ))
    liquidations_pair: str = Field(
        default="BTC-USDT",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the liquidations pair to monitor on Binance: ",
            prompt_on_new=True
        ))
    liquidations_interval_seconds: int = Field(
        default=15,
        client_data=ClientFieldData(
            prompt=lambda msg: "The amount of seconds to accumulate liquidations (e.g. if more than 10Mio USDT "
                               "[=liquidations_trigger_usd_amount] is liquidated in 15s, then place the orders): ",
            prompt_on_new=True
        ))
    liquidations_trigger_usd_amount: int = Field(
        default=10_000_000,  # 10 Mio USDT
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "The amount of USD that was liquidated in the liquidations-interval to "
                               "actually place the trade: ",
            prompt_on_new=True
        ))
    # ---------------------------------------------------------------------------------------
    # DCA Config
    # ---------------------------------------------------------------------------------------
    total_amount_quote: Decimal = Field(
        default=100,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt_on_new=True,
            prompt=lambda mi: "Enter the total amount in quote asset to use for trading (e.g., 100):"))
    dca_levels_percent: List[Decimal] = Field(
        default="0.01,0.02,0.03,0.05",
        client_data=ClientFieldData(
            prompt_on_new=True,
            is_updatable=True,
            prompt=lambda msg: "Enter a comma-separated list of percentage values where each DCA level should be "
                               "placed (as a decimal, e.g., 0.01 for 1%): "))
    dca_amounts_percent: List[Decimal] = Field(
        default="0.1,0.2,0.3,0.4",
        client_data=ClientFieldData(
            prompt_on_new=True,
            is_updatable=True,
            prompt=lambda msg: "Enter a comma-separated list of percentage values of the total quote amount that "
                               "should be placed at each DCA level (as a decimal, e.g., 0.1 for 10%): "))
    stop_loss: Decimal = Field(
        default=Decimal("0.03"), gt=0,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "Enter the stop loss (as a decimal, e.g., 0.03 for 3%): ",
            prompt_on_new=True))
    take_profit: Decimal = Field(
        default=Decimal("0.01"), gte=0,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "Enter the take profit (as a decimal, e.g., 0.01 for 1%): ",
            prompt_on_new=True))
    time_limit: int = Field(
        default=60 * 30, gt=0,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "Enter the time limit in seconds (e.g., 1800 for 30 minutes): ",
            prompt_on_new=True))
    # ---------------------------------------------------------------------------------------
    # Perp Config
    # ---------------------------------------------------------------------------------------
    leverage: int = Field(
        default=5,
        client_data=ClientFieldData(
            prompt_on_new=True,
            prompt=lambda msg: "Set the leverage to use for trading (e.g., 5 for 5x leverage). "
                               "Set it to 1 for spot trading:"))
    position_mode: PositionMode = Field(
        default="HEDGE",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the position mode (HEDGE/ONEWAY): ",
            prompt_on_new=False
        )
    )
    # ---------------------------------------------------------------------------------------
    # Validators
    # ---------------------------------------------------------------------------------------
    @validator('liquidations_pair', pre=True, always=True)
    def validate_usdm_pair(cls, value):
        if "usd" in value.lower():
            return value
        raise ValueError("Liquidations pair must be a USDⓈ-M Future contract!")
    @validator("time_limit", "stop_loss", "take_profit", pre=True, always=True)
    def validate_target(cls, value):
        if isinstance(value, str):
            if value == "":
                return None
            return Decimal(value)
        return value
    @validator('dca_levels_percent', pre=True, always=True)
    def parse_levels(cls, value) -> List[Decimal]:
        if value is None:
            return []
        if isinstance(value, str):
            if value == "":
                return []
            return [Decimal(x.strip()) for x in value.split(',')]
        return value
    @validator('dca_amounts_percent', pre=True, always=True)
    def parse_and_validate_amounts(cls, value, values, field) -> List[Decimal]:
        if value is None or value == "":
            return [Decimal(1) for _ in values[values['dca_levels_percent']]]
        if isinstance(value, str):
            return [Decimal(x.strip()) for x in value.split(',')]
        elif isinstance(value, list) and len(value) != len(values['dca_levels_percent']):
            raise ValueError(
                f"The number of {field.name} must match the number of levels ({len(values['dca_levels_percent'])}).")
        elif isinstance(value, list):
            return [Decimal(amount) for amount in value]
        raise ValueError("DCA amounts per level is invalid!")
    @validator('position_mode', pre=True, allow_reuse=True)
    def validate_position_mode(cls, value: str) -> PositionMode:
        if isinstance(value, str) and value.upper() in PositionMode.__members__:
            return PositionMode[value.upper()]
        raise ValueError(f"Invalid position mode: {value}. Valid options are: {', '.join(PositionMode.__members__)}")
    @validator('liquidation_side', pre=True, always=True)
    def validate_liquidation_side(cls, value: str) -> LiquidationSide:
        if isinstance(value, str) and value.upper() in LiquidationSide.__members__:
            return LiquidationSide[value.upper()]
        raise ValueError(
            f"Invalid liquidation side: {value}. Valid options are: {', '.join(LiquidationSide.__members__)}")
    # ---------------------------------------------------------------------------------------
    # Market Config
    # ---------------------------------------------------------------------------------------
    def update_markets(self, markets: Dict[str, Set[str]]) -> Dict[str, Set[str]]:
        if self.trading_connector not in markets:
            markets[self.trading_connector] = set()
        markets[self.trading_connector].add(self.trading_pair)
        return markets
... 
编写实际的 V2 控制器代码¶
让我们分解实际控制器需要完成的任务:
- 初始化 Binance 的强平数据流
- 在 update_processed_data()中计算并存储当前强平数据
- 在 determine_executor_actions()中检查条件是否满足,并触发交易
- 编写一个辅助方法,根据策略配置来设置 DCAExecutor
- 在 to_format_status()中添加日志信息,以便控制器运行时能实时获取状态信息
...
class LiquidationSniper(ControllerBase):
    def __init__(self, config: LiquidationSniperConfig, *args, **kwargs):
        super().__init__(config, *args, **kwargs)
        self.config = config  # only for type check in IDE
        self.liquidations_feed = None
        self.initialize_liquidations_feed()
        # Make the configuration more forgiving, by calculating the real percentages if not done already
        self.dca_amounts_pct = [Decimal(amount) / sum(self.config.dca_amounts_percent) for amount in
                                self.config.dca_amounts_percent]
    def initialize_liquidations_feed(self):
        liquidations_config = LiquidationsConfig(
            connector="binance",  # use Binance as the most liquid exchange (currently the only feed supported!)
            max_retention_seconds=self.config.liquidations_interval_seconds,
            trading_pairs=[self.config.liquidations_pair]
        )
        self.liquidations_feed = LiquidationsFactory.get_liquidations_feed(liquidations_config)
    def on_start(self):
        self.liquidations_feed.start()
        self.logger().info("Watching for {} liquidations happening on {} (Binance) within {}s to exceed {} USD"
                           .format(self.config.liquidation_side,
                                   self.config.liquidations_pair,
                                   self.config.liquidations_interval_seconds,
                                   self.config.liquidations_trigger_usd_amount))
    def on_stop(self):
        self.liquidations_feed.stop()
    async def update_processed_data(self):
        df = self.liquidations_feed.liquidations_df(self.config.liquidations_pair)
        df['usd_amount'] = df['quantity'] * df['price']
        df = df[df['side'] == self.config.liquidation_side]
        self.processed_data['liquidated_usd_amount'] = df['usd_amount'].sum()
    def determine_executor_actions(self) -> List[ExecutorAction]:
        executor_actions = []
        liquidated_usd_amount = self.processed_data['liquidated_usd_amount']
        trading_executors = self.filter_executors(
            executors=self.executors_info,
            filter_func=lambda executor: executor.is_active and executor.controller_id == self.config.id
        )
        # Only initiate a trade when both criteria is met
        if liquidated_usd_amount >= self.config.liquidations_trigger_usd_amount and len(trading_executors) == 0:
            self.logger().info("The current liquidation-amount ({} USD) in the last {}s is above threshold "
                               "of {} USD => entering trade!".format(liquidated_usd_amount,
                                                                     self.config.liquidations_interval_seconds,
                                                                     self.config.liquidations_trigger_usd_amount))
            executor_actions.append(CreateExecutorAction(
                executor_config=self.get_dca_executor_config(),
                controller_id=self.config.id))
        return executor_actions
    def get_dca_executor_config(self) -> DCAExecutorConfig:
        trade_type = TradeType.BUY if self.config.liquidation_side == LiquidationSide.LONG else TradeType.SELL
        # Use the mid-price to calculate the levels, sl and tp
        price = self.market_data_provider.get_price_by_type(self.config.trading_connector,
                                                            self.config.trading_pair,
                                                            PriceType.MidPrice)
        if trade_type == TradeType.BUY:
            prices = [price * (1 - level) for level in self.config.dca_levels_percent]
        else:
            prices = [price * (1 + level) for level in self.config.dca_levels_percent]
        amounts_quote = [self.config.total_amount_quote * pct for pct in self.dca_amounts_pct]
        return DCAExecutorConfig(
            controller_id=self.config.id,
            timestamp=time.time(),
            connector_name=self.config.trading_connector,
            trading_pair=self.config.trading_pair,
            mode=DCAMode.MAKER,
            leverage=self.config.leverage,
            side=trade_type,
            amounts_quote=amounts_quote,
            prices=prices,
            take_profit=self.config.take_profit,
            stop_loss=self.config.stop_loss,
            time_limit=self.config.time_limit,
        )
    def to_format_status(self) -> List[str]:
        return ["Currently liquidated {} of pair {} in the last {} seconds: {} USD".format(
            self.config.liquidation_side,
            self.config.liquidations_pair,
            self.config.liquidations_interval_seconds,
            self.processed_data['liquidated_usd_amount'])]
...
运行策略¶
完成了所有编码工作后,现在是实际运行它的时候了。假设你已经连接到一个交易所(在本例中为 KuCoin)。
将控制器添加到你的 Hummingbot¶
将文件 liquidations_sniper.py 添加到你的 Hummingbot 安装目录中,如下所示:

创建控制器配置¶
现在启动 Hummingbot,并使用以下命令创建配置:

这里显示的是使用所有默认值的结果。系统会创建一个名为 conf_generic.liquidation_sniper_1.yml 的文件。
使用 generic-script 运行控制器¶
在最后也是最关键的一步中,我们需要为 generic 策略(随 Hummingbot 提供)创建一个非常简单的配置,以实际运行该控制器。
如果你不确定这样做的原因,请阅读文档:Strategies V2 Architecture
创建策略配置¶
接下来,我们必须为 v2_with_controllers 策略创建脚本配置,以便运行我们刚刚创建的控制器。
重要:请在此处输入你在上一步中创建的控制器配置文件名称,否则无法正常运行!

最终,脚本配置文件 conf_v2_with_controllers_1.yml 已创建完成,我们现在可以运行我们的控制器了。
运行策略¶
让我们在 KuCoin 上实时运行该策略……

监控实时状态¶
你可以使用命令:status --live 来监控实际执行情况。

结论¶
现在,你已经掌握了使用新的 V2 框架编写控制器并在 Hummingbot 实例中运行所需的所有知识。
如需了解更多关于 V2 框架的信息,请访问官方文档或查看 Hummingbot 自带示例的源代码。
 
                