跳转至内容

编写一个清算狙击手 V2 策略控制器

cover

作者:Patrick Meier

在本文中,我们将探讨如何为 Hummingbot 创建一个自定义的 V2 控制器来狙击币安上的未来清算。我们将使用一个通用控制器来演示该策略的正确用法和实现。

清算狙击手策略

该策略应当能够利用加密货币期货市场中定期发生的清算。当清算发生时,市场存在快速反弹的倾向,该策略旨在通过使用不同的订单级别(DCA)来捕捉这种反弹。

这个交易策略的目标是利用加密货币期货市场中清算事件后经常出现的快速反弹。这些清算事件由交易者被强制平仓其杠杆头寸所引发,造成了可能暂时破坏市场均衡的急剧价格波动。

示例

在这张 30 分钟的 BTC/USDT 图表上,红色蜡烛线指示的急剧向下价格运动与多头清算的激增同时发生,暗示着杠杆多头头寸的强制平仓级联。紧随此事件之后的是明显的价格恢复,突出了市场价格在吸收清算影响后向上修正的反弹。

编码策略

为了更好的可读性,我在这里有意省略了导入语句。您可以在此处下载完整的源代码: liquidations_sniper.py

控制流程

让我们快速了解一下我们将要构建的控制器的高层控制流程。

  • 我们监控在币安上发生的清算并在指定的秒数内进行汇总
  • 如果在该时间间隔内达到特定金额的报价货币(通常是 USDⓈ-M)且没有活跃头寸存在,我们开立 DCA 交易

定义配置

首先我们需要定义用户可以配置的变量。它们被分组在一起以获得更好的概览...

清算配置

  • trading_connector: 订单应放置的连接器 (默认: kucoin_perpetual)
  • trading_pair: 您在所选交易所上用于交易的交易对 (默认: XBT-USDT)
  • liquidation_side: 您想交易哪一方,长线或短线清算 (默认: LONG, 有效: LONG / SHORT)
    • 例如,交易长线清算意味着价格正在下跌,高杠杆的多头头寸被清算。如果发生这种情况(当给定间隔内的触发金额达到时),我们下 DCA 订单来交易反弹
  • liquidations_pair: 监控币安上清算的交易对 (默认: BTC-USDT)
    • 重要: 该策略假定处理 USDⓈ-M 期货
  • liquidations_interval_seconds: 汇集清算的秒数 (默认: 15)
  • liquidations_trigger_usd_amount: 在清算间隔内实际下单交易的被清算的 USD 金额 (默认: 10,000,000)

DCA 配置

  • total_amount_quote: 用于交易的报价资产总金额(默认值:100)
    • 通常为 USD
  • dca_levels_percent: 以小数形式表示的每个定投(DCA)级别的百分比值列表,用逗号分隔,例如 0.01 表示 1%(默认值:0.01, 0.02, 0.03, 0.05)
  • dca_amounts_percent: 以小数形式表示的每个 DCA 级别从 total_quote_amount 中所占百分比值列表,用逗号分隔,例如 0.1 表示 10%(默认值:0.1, 0.2, 0.3, 0.4)
    • 总和应为 1 = 100%,如果不是,则按总和处理并视为 100%
  • stop_loss: 止损百分比(默认值:0.03)
  • take_profit: 止盈百分比(默认值:0.01)
  • time_limit: 时间限制(秒)(默认值:1800 ⇒ 30 分钟)

永续合约配置

  • leverage: 用于交易的杠杆(默认值:5)。现货交易设为 1
  • position_mode: 仓位模式(默认值:HEDGE,有效值:HEDGE / ONEWAY)

编写配置类

我们自定义控制器的配置类继承自 ControllerConfigBase,这是一个 Pydantic 模型,提供了 controller_name 等标准属性。然后我们使用 Pydantic 定义具有内置验证的字段并提供描述(这些描述会在 Hummingbot CLI 中显示给用户)。最后,我们添加自定义验证器以确保配置的健壮性并防止潜在的运行时错误。

...

class LiquidationSniperConfig(ControllerConfigBase):
    """
    This controller executes a strategy that listens for liquidations on Binance for the given pair
    and executes a DCA trade to profit from the rebound. The profitability is highly dependent on the 
    settings you make. 

    Docs: https://www.notion.so/hummingbot-foundation/Liquidation-Sniper-V2-Framework-739dadb04eac4aa6a082067e06ddf7db
    """

    controller_name: str = "liquidations_sniper"
    candles_config: List[CandlesConfig] = []  # do not need any candles for that

    # ---------------------------------------------------------------------------------------
    # Liquidations Config
    # ---------------------------------------------------------------------------------------

    trading_connector: str = Field(
        default="kucoin_perpetual",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the trading connector (where the execution of the order shall take place): ",
            prompt_on_new=True
        ))
    trading_pair: str = Field(
        default="XBT-USDT",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the trading pair which you want to use for trading: ",
            prompt_on_new=True
        ))
    liquidation_side: LiquidationSide = Field(
        default="LONG",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter which liquidations you want to trade on (SHORT/LONG). Trading long liquidations "
                               "means price is going down and over-leveraged long positions get forcefully liquidated. "
                               "The strategy would then DCA-Buy into that liquidation and waiting for the rebound: ",
            prompt_on_new=True
        ))
    liquidations_pair: str = Field(
        default="BTC-USDT",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the liquidations pair to monitor on Binance: ",
            prompt_on_new=True
        ))
    liquidations_interval_seconds: int = Field(
        default=15,
        client_data=ClientFieldData(
            prompt=lambda msg: "The amount of seconds to accumulate liquidations (e.g. if more than 10Mio USDT "
                               "[=liquidations_trigger_usd_amount] is liquidated in 15s, then place the orders): ",
            prompt_on_new=True
        ))
    liquidations_trigger_usd_amount: int = Field(
        default=10_000_000,  # 10 Mio USDT
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "The amount of USD that was liquidated in the liquidations-interval to "
                               "actually place the trade: ",
            prompt_on_new=True
        ))

    # ---------------------------------------------------------------------------------------
    # DCA Config
    # ---------------------------------------------------------------------------------------

    total_amount_quote: Decimal = Field(
        default=100,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt_on_new=True,
            prompt=lambda mi: "Enter the total amount in quote asset to use for trading (e.g., 100):"))
    dca_levels_percent: List[Decimal] = Field(
        default="0.01,0.02,0.03,0.05",
        client_data=ClientFieldData(
            prompt_on_new=True,
            is_updatable=True,
            prompt=lambda msg: "Enter a comma-separated list of percentage values where each DCA level should be "
                               "placed (as a decimal, e.g., 0.01 for 1%): "))
    dca_amounts_percent: List[Decimal] = Field(
        default="0.1,0.2,0.3,0.4",
        client_data=ClientFieldData(
            prompt_on_new=True,
            is_updatable=True,
            prompt=lambda msg: "Enter a comma-separated list of percentage values of the total quote amount that "
                               "should be placed at each DCA level (as a decimal, e.g., 0.1 for 10%): "))
    stop_loss: Decimal = Field(
        default=Decimal("0.03"), gt=0,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "Enter the stop loss (as a decimal, e.g., 0.03 for 3%): ",
            prompt_on_new=True))
    take_profit: Decimal = Field(
        default=Decimal("0.01"), gte=0,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "Enter the take profit (as a decimal, e.g., 0.01 for 1%): ",
            prompt_on_new=True))
    time_limit: int = Field(
        default=60 * 30, gt=0,
        client_data=ClientFieldData(
            is_updatable=True,
            prompt=lambda msg: "Enter the time limit in seconds (e.g., 1800 for 30 minutes): ",
            prompt_on_new=True))

    # ---------------------------------------------------------------------------------------
    # Perp Config
    # ---------------------------------------------------------------------------------------

    leverage: int = Field(
        default=5,
        client_data=ClientFieldData(
            prompt_on_new=True,
            prompt=lambda msg: "Set the leverage to use for trading (e.g., 5 for 5x leverage). "
                               "Set it to 1 for spot trading:"))
    position_mode: PositionMode = Field(
        default="HEDGE",
        client_data=ClientFieldData(
            prompt=lambda msg: "Enter the position mode (HEDGE/ONEWAY): ",
            prompt_on_new=False
        )
    )

    # ---------------------------------------------------------------------------------------
    # Validators
    # ---------------------------------------------------------------------------------------

    @validator('liquidations_pair', pre=True, always=True)
    def validate_usdm_pair(cls, value):
        if "usd" in value.lower():
            return value
        raise ValueError("Liquidations pair must be a USDⓈ-M Future contract!")

    @validator("time_limit", "stop_loss", "take_profit", pre=True, always=True)
    def validate_target(cls, value):
        if isinstance(value, str):
            if value == "":
                return None
            return Decimal(value)
        return value

    @validator('dca_levels_percent', pre=True, always=True)
    def parse_levels(cls, value) -> List[Decimal]:
        if value is None:
            return []
        if isinstance(value, str):
            if value == "":
                return []
            return [Decimal(x.strip()) for x in value.split(',')]
        return value

    @validator('dca_amounts_percent', pre=True, always=True)
    def parse_and_validate_amounts(cls, value, values, field) -> List[Decimal]:
        if value is None or value == "":
            return [Decimal(1) for _ in values[values['dca_levels_percent']]]
        if isinstance(value, str):
            return [Decimal(x.strip()) for x in value.split(',')]
        elif isinstance(value, list) and len(value) != len(values['dca_levels_percent']):
            raise ValueError(
                f"The number of {field.name} must match the number of levels ({len(values['dca_levels_percent'])}).")
        elif isinstance(value, list):
            return [Decimal(amount) for amount in value]
        raise ValueError("DCA amounts per level is invalid!")

    @validator('position_mode', pre=True, allow_reuse=True)
    def validate_position_mode(cls, value: str) -> PositionMode:
        if isinstance(value, str) and value.upper() in PositionMode.__members__:
            return PositionMode[value.upper()]
        raise ValueError(f"Invalid position mode: {value}. Valid options are: {', '.join(PositionMode.__members__)}")

    @validator('liquidation_side', pre=True, always=True)
    def validate_liquidation_side(cls, value: str) -> LiquidationSide:
        if isinstance(value, str) and value.upper() in LiquidationSide.__members__:
            return LiquidationSide[value.upper()]
        raise ValueError(
            f"Invalid liquidation side: {value}. Valid options are: {', '.join(LiquidationSide.__members__)}")

    # ---------------------------------------------------------------------------------------
    # Market Config
    # ---------------------------------------------------------------------------------------

    def update_markets(self, markets: Dict[str, Set[str]]) -> Dict[str, Set[str]]:
        if self.trading_connector not in markets:
            markets[self.trading_connector] = set()
        markets[self.trading_connector].add(self.trading_pair)
        return markets

... 

编写实际的 V2 控制器

让我们分解实际控制器的任务:

  • 初始化 Binance 的清算数据流
  • update_processed_data() 中计算并存储当前清算数据
  • determine_executor_actions() 中检查条件是否满足并发起交易
  • 编写一个辅助方法,根据策略配置配置 DCAExecutor
  • to_format_status() 中添加日志信息,以便在控制器运行时获取实时信息
...
class LiquidationSniper(ControllerBase):

    def __init__(self, config: LiquidationSniperConfig, *args, **kwargs):
        super().__init__(config, *args, **kwargs)
        self.config = config  # only for type check in IDE
        self.liquidations_feed = None
        self.initialize_liquidations_feed()
        # Make the configuration more forgiving, by calculating the real percentages if not done already
        self.dca_amounts_pct = [Decimal(amount) / sum(self.config.dca_amounts_percent) for amount in
                                self.config.dca_amounts_percent]

    def initialize_liquidations_feed(self):
        liquidations_config = LiquidationsConfig(
            connector="binance",  # use Binance as the most liquid exchange (currently the only feed supported!)
            max_retention_seconds=self.config.liquidations_interval_seconds,
            trading_pairs=[self.config.liquidations_pair]
        )
        self.liquidations_feed = LiquidationsFactory.get_liquidations_feed(liquidations_config)

    def on_start(self):
        self.liquidations_feed.start()
        self.logger().info("Watching for {} liquidations happening on {} (Binance) within {}s to exceed {} USD"
                           .format(self.config.liquidation_side,
                                   self.config.liquidations_pair,
                                   self.config.liquidations_interval_seconds,
                                   self.config.liquidations_trigger_usd_amount))

    def on_stop(self):
        self.liquidations_feed.stop()

    async def update_processed_data(self):
        df = self.liquidations_feed.liquidations_df(self.config.liquidations_pair)
        df['usd_amount'] = df['quantity'] * df['price']
        df = df[df['side'] == self.config.liquidation_side]
        self.processed_data['liquidated_usd_amount'] = df['usd_amount'].sum()

    def determine_executor_actions(self) -> List[ExecutorAction]:
        executor_actions = []
        liquidated_usd_amount = self.processed_data['liquidated_usd_amount']
        trading_executors = self.filter_executors(
            executors=self.executors_info,
            filter_func=lambda executor: executor.is_active and executor.controller_id == self.config.id
        )

        # Only initiate a trade when both criteria is met
        if liquidated_usd_amount >= self.config.liquidations_trigger_usd_amount and len(trading_executors) == 0:
            self.logger().info("The current liquidation-amount ({} USD) in the last {}s is above threshold "
                               "of {} USD => entering trade!".format(liquidated_usd_amount,
                                                                     self.config.liquidations_interval_seconds,
                                                                     self.config.liquidations_trigger_usd_amount))
            executor_actions.append(CreateExecutorAction(
                executor_config=self.get_dca_executor_config(),
                controller_id=self.config.id))
        return executor_actions

    def get_dca_executor_config(self) -> DCAExecutorConfig:
        trade_type = TradeType.BUY if self.config.liquidation_side == LiquidationSide.LONG else TradeType.SELL

        # Use the mid-price to calculate the levels, sl and tp
        price = self.market_data_provider.get_price_by_type(self.config.trading_connector,
                                                            self.config.trading_pair,
                                                            PriceType.MidPrice)
        if trade_type == TradeType.BUY:
            prices = [price * (1 - level) for level in self.config.dca_levels_percent]
        else:
            prices = [price * (1 + level) for level in self.config.dca_levels_percent]

        amounts_quote = [self.config.total_amount_quote * pct for pct in self.dca_amounts_pct]

        return DCAExecutorConfig(
            controller_id=self.config.id,
            timestamp=time.time(),
            connector_name=self.config.trading_connector,
            trading_pair=self.config.trading_pair,
            mode=DCAMode.MAKER,
            leverage=self.config.leverage,
            side=trade_type,
            amounts_quote=amounts_quote,
            prices=prices,
            take_profit=self.config.take_profit,
            stop_loss=self.config.stop_loss,
            time_limit=self.config.time_limit,
        )

    def to_format_status(self) -> List[str]:
        return ["Currently liquidated {} of pair {} in the last {} seconds: {} USD".format(
            self.config.liquidation_side,
            self.config.liquidations_pair,
            self.config.liquidations_interval_seconds,
            self.processed_data['liquidated_usd_amount'])]
...

运行策略

现在我们已经完成了所有编码,是时候实际运行它了。假设您已经连接到交易所(在此示例中为 KuCoin)。

将控制器添加到您的 Hummingbot

像这样将文件 liquidations_sniper.py 添加到您的 Hummingbot 安装目录:

创建控制器配置

现在启动 Hummingbot 并使用此命令创建配置:

create --controller-config generic.liquidation_sniper

这里您看到的是使用所有默认值的结果。创建了一个名为 conf_generic.liquidation_sniper_1.yml 的文件。

使用通用脚本运行控制器

在最后也是最终步骤中,我们需要为通用策略(随 Hummingbot 提供)创建一个非常简单的配置以实际运行控制器。

如果您不确定这样做的原因,只需阅读文档:Strategies V2 Architecture

创建策略配置

接下来我们必须为 v2_with_controllers 策略创建脚本配置以运行我们新创建的控制器。

create --script-config v2_with_controllers

重要:使用您在上一步中创建的控制器配置文件的名称作为输入,这样所有内容才能正常工作!

最后脚本配置 conf_v2_with_controllers_1.yml 已创建,我们可以运行我们的控制器。

运行策略

让我们在 KuCoin 上实时运行策略...

start --script v2_with_controllers.py --conf conf_v2_with_controllers_1.yml

监现实时状态

您可以使用以下命令监真实执行情况:status --live

结论

现在你已经掌握了使用新 V2 框架编写控制器并在你的 Hummingbot 实例中运行它们所需的全部知识。

有关 V2 框架的更多信息,请访问文档或查看随 Hummingbot 提供的示例源代码。