6. 高级策略模式

6.高级策略模式

在掌握了基础的交易进出逻辑后,策略开发往往会遇到更复杂的需求。比如,想在不同价位分批建仓,或者根据市场变化动态调整止盈止损,甚至让多个交易对之间产生联动。Jesse 的设计哲学是"简单事情简单做,复杂事情可行",这些高级模式正是为了解决实际交易中的精细控制需求。

多点进出管理

传统策略通常是一次性开仓、一次性平仓。但真实市场往往需要更灵活的操作:突破时先建半仓,回调再加半仓;或者盈利时分批止盈,让利润奔跑的同时锁定部分收益。

分批出场

go_long()go_short() 中,self.take_profitself.stop_loss 支持列表语法。每个元素是一个元组,包含(数量,价格)。Jesse 会自动处理这些订单,当价格触发时执行对应的平仓操作。

def go_long(self):
    qty = 2
    
    self.buy = qty, self.price
    # 在 120 和 140 两个价位分批止盈
    self.take_profit = [
        (qty / 2, 120),  # 第一单:平掉 1 个仓位
        (qty / 2, 140)   # 第二单:平掉剩余 1 个仓位
    ]
    self.stop_loss = qty, 80  # 全部仓位统一止损

这段代码展示了最经典的分批止盈策略。入场后,系统会同时挂出两个止盈单和一个止损单。当价格达到 120 时,自动平仓一半;达到 140 时,平仓另一半。止损单始终保护全部仓位。这种设计让策略逻辑清晰,不需要手动跟踪哪个订单已执行。

分批入场

同样原理适用于分批建仓。通过列表语法,可以在不同价位逐步建立头寸。

def go_long(self):
    # 计划分两次买入,每次 1 个单位
    self.buy = [
        (1, 120),  # 第一笔:价格达到 120 时买入
        (1, 140)   # 第二笔:价格达到 140 时再买入
    ]
    self.stop_loss = 2, 100  # 全部仓位统一止损
    self.take_profit = 2, 160

这种模式下,Jesse 会先提交第一个限价单。当 120 的订单成交后,系统会自动提交第二个 140 的订单。如果价格直接上涨,可能只成交第一笔;如果回调,两笔都可能成交。reduced_count 属性在这里派上用场,它记录了仓位被减少的次数。

def go_long(self):
    self.buy = 2, 100
    self.take_profit = [
        (1, 120),
        (1, 140)
    ]

def update_position(self):
    # 当第一笔止盈成交后,调整剩余仓位的止损到成本价
    if self.reduced_count > 0:
        self.stop_loss = 1, 100

reduced_count 从 0 开始,每次止盈或止损成交导致仓位减少时,这个值就会增加。利用它,可以实现"移动止损到保本"的经典风控手法。第一笔止盈后,将剩余仓位的止损价调整到入场价,确保后续波动不会亏损。

动态调整未成交订单

有时入场后,市场结构发生变化,需要取消之前的挂单。should_cancel_entry() 方法专门处理这个场景。

def should_long(self):
    return True

def go_long(self):
    # 在最高价上方 2 个点挂突破单
    self.buy = 1, self.high + 2

def should_cancel_entry(self):
    # 如果新蜡烛形成后,价格反而下跌,取消旧订单
    return self.close < self.open

这个例子中,策略试图捕捉向上突破。但如果市场未能延续强势,反而收阴线,说明突破可能失败。此时返回 True 取消挂单的止损单,避免在弱势中追高。注意,这个方法只影响入场订单,不影响已成交仓位后的止盈止损。

动态仓位更新

市场瞬息万变,预先设定的止盈止损有时需要调整。update_position() 方法在每次新蜡烛到来时,只要仓位是打开的,就会被调用。这是实现移动止损、动态止盈、加仓减仓的核心机制。

移动止盈止损

趋势跟踪策略的典型做法是让利润奔跑,同时用移动止损保护收益。

def update_position(self):
    qty = self.position.qty
    
    # 多头仓位:止盈设在当前蜡烛高点下方 10 个点
    if self.is_long:
        self.take_profit = qty, self.high - 10
    # 空头仓位:止盈设在当前蜡烛低点上方 10 个点
    elif self.is_short:
        self.take_profit = qty, self.low + 10

每根新蜡烛都会重新计算止盈价。对于多头,如果价格不断创出新高,止盈价也会跟随上移;如果价格回落,止盈价保持不变(因为 self.high 是新的高点)。这样就实现了"只上移不下移"的移动止盈逻辑。

条件平仓

有时需要根据指标信号主动平仓,而不是等待价格触及预设点位。

def update_position(self):
    # 多头仓位且 RSI 达到超买区,立即平仓
    if self.is_long and ta.rsi(self.candles) > 80:
        self.liquidate()

liquidate() 是平仓的快捷方法,它会以市价单平掉所有仓位,并自动取消相关订单。这种用法在震荡策略中很常见:突破入场,但指标显示超买超卖时提前离场,不等待止损触发。

浮盈加仓

金字塔加仓是经典手法,但风险较高。Jesse 允许在 update_position() 中继续下单。

def update_position(self):
    # 多头仓位盈利超过 5%,且 RSI 显示超卖,加仓一倍
    if self.is_long and self.position.pnl_percentage > 5:
        if ta.rsi(self.candles) < 30:
            self.buy = self.position.qty, self.price

这里在原有仓位盈利的基础上,遇到回调(RSI 超卖)时加倍仓位。self.position.qty 获取当前仓位数量,买入相同数量就实现了仓位翻倍。注意,加仓后需要重新设置止损止盈,否则新仓位没有保护。

仓位事件处理

事件驱动是 Jesse 的一大特色。当仓位状态发生变化时,系统会自动调用对应的事件方法。这让策略能精确响应市场执行,而不是被动等待下一根蜡烛。

基础事件

on_open_position 在入场订单成交瞬间触发,此时 self.position 已经可用。

def on_open_position(self, order):
    # 记录开仓信息
    self.log(f"仓位已开,入场价: {self.position.entry_price}, 数量: {self.position.qty}")

on_close_position 在仓位完全平仓时触发。通过 order 参数可以判断是止盈还是止损。

def on_close_position(self, order):
    if order.is_take_profit:
        self.log("止盈平仓")
    elif order.is_stop_loss:
        self.log("止损平仓")

on_reduced_position 在仓位部分平仓时触发,比如分批止盈成交了一半。

def go_long(self):
    self.buy = 2, 100
    self.take_profit = [
        (1, 120),
        (1, 140)
    ]

def on_reduced_position(self, order):
    # 第一笔止盈成交后,将止损移动到成本价
    self.stop_loss = 1, 100

on_increased_position 在加仓时触发,无论是 go_long() 中的分批入场,还是 update_position() 中的动态加仓。

def on_increased_position(self, order):
    # 加仓后,重新计算总仓位的止损
    total_qty = self.position.qty
    self.stop_loss = total_qty, self.position.entry_price * 0.95

跨策略通信事件

多品种策略需要协同操作时,Jesse 提供了路由事件。假设同时交易 BTC 和 ETH,当 ETH 开仓时,想平掉 BTC 的仓位。

def on_route_open_position(self, strategy):
    # 当 ETH-USDT 策略开仓时,平掉当前策略的仓位
    if self.is_open and strategy.symbol == 'ETH-USDT':
        self.liquidate()

类似的事件还有 on_route_close_positionon_route_increased_positionon_route_reduced_positionon_route_canceled。这些事件接收 strategy 参数,可以访问其他策略的符号、仓位等信息。

跨策略通信需要配合 self.shared_vars 共享数据。比如配对交易策略,一个策略做多 A,另一个做空 B,通过共享变量传递信号。

def go_long(self):
    # 开仓时标记信号
    self.shared_vars['signal'] = 'long'
    
def on_route_open_position(self, strategy):
    # 另一个策略读取信号并反向操作
    if self.shared_vars.get('signal') == 'long':
        self.sell = 1, self.price

策略过滤器

过滤器是 Jesse 的独特设计,用于将入场条件与过滤条件分离。当策略需要多个条件同时满足,但又不想在 should_long() 里写复杂判断时,过滤器是最佳选择。

基本用法

首先定义 filters() 方法,返回一个过滤器函数列表。然后编写过滤器函数,函数名随意,建议包含 filter

def filters(self):
    return [
        self.trend_filter,
        self.volatility_filter
    ]

def trend_filter(self):
    # 只在上升趋势中交易
    return self.close > ta.ema(self.candles, 50)

def volatility_filter(self):
    # 波动率不能太大
    return ta.atr(self.candles, 14) < self.close * 0.02

关键点是 filters() 返回的是函数对象,不是调用结果。所以不能加括号,这是新手常犯的错误。

错误写法:

def filters(self):
    return [self.trend_filter()]  # 这会立即执行,返回布尔值

正确写法:

def filters(self):
    return [self.trend_filter]    # 返回函数本身,让框架在适当时机调用

过滤器的优势

过滤器能访问入场和出场价格,这是 should_long() 做不到的。因为 should_long() 只决定"是否交易",而具体价格是在 go_long() 中定义的。过滤器在两者之后执行,可以拿到最终计算好的价格。

def minimum_pnl_filter(self):
    # 确保每笔交易的预期盈利大于 1%
    reward = abs(self.average_take_profit - self.average_entry_price)
    pnl_percentage = (reward / self.average_entry_price) * 100
    return pnl_percentage > 1

self.average_entry_priceself.average_take_profit 是 Jesse 自动计算的平均值,考虑了分批入场和出场的情况。如果策略的盈亏比不达标,即使 should_long() 返回 True,也不会开仓。

另一个优势是调试方便。当过滤器阻止交易时,Jesse 会在日志中记录哪个过滤器未通过,这比在 should_long() 里调试复杂的 if-else 要直观得多。

前后置钩子

before()after() 是策略执行流程的两端,分别在每个蜡烛开始时和结束后调用。它们不直接参与交易决策,而是用于状态管理和数据准备。

before() 前置处理

在新蜡烛数据刚到达,但策略逻辑尚未执行时调用。适合更新自定义变量、预计算指标等。

def before(self):
    # 记录当前蜡烛的开盘时间
    self.vars['candle_start'] = self.time
    
    # 预计算一些复杂指标,供后续方法使用
    self.vars['rsi'] = ta.rsi(self.candles, 14)
    self.vars['ema_fast'] = ta.ema(self.candles, 20)
    self.vars['ema_slow'] = ta.ema(self.candles, 50)

self.vars 是策略实例的私有字典,生命周期与策略相同。与 self.shared_vars 不同,它不能在策略间共享,适合存放当前策略的状态。

after() 后置处理

在策略所有逻辑执行完毕后调用,包括 update_position() 和事件处理。适合清理状态、记录日志等。

def after(self):
    # 检查是否有未处理的订单
    if self.orders:
        self.log(f"活跃订单数: {len(self.orders)}")
    
    # 更新自定义计数器
    if self.is_open:
        self.vars['bars_since_entry'] = self.vars.get('bars_since_entry', 0) + 1
    else:
        self.vars['bars_since_entry'] = 0

这个例子统计了持仓的蜡烛数量。self.orders 返回当前所有活跃订单,可以用来监控挂单状态。

完整执行流程

理解这些钩子的调用顺序对排查问题至关重要。Jesse 的策略执行流程如下:

graph TD
    A[新蜡烛到达] --> B[before()]
    B --> C{仓位是否打开?}
    C -->|否| D[should_long()/should_short()]
    D --> E[should_cancel_entry()]
    C -->|是| F[update_position()]
    F --> G[事件处理]
    E --> H[go_long()/go_short()]
    H --> I[提交订单]
    G --> J[after()]
    I --> J

before() 最先执行,after() 最后执行。无论是否有仓位,这两个方法都会被调用。而 should_long()update_position() 等则根据仓位状态选择性执行。

生命周期管理

策略还有开始和结束的特殊方法,虽然不常用,但在某些场景下很关键。

init() 初始化

策略类实例化时调用一次,适合加载模型、初始化参数。

def __init__(self):
    super().__init__()
    
    # 加载机器学习模型
    import pickle
    with open('model.pkl', 'rb') as f:
        self.model = pickle.load(f)
    
    # 初始化自定义变量
    self.vars['trade_count'] = 0

必须首先调用 super().__init__(),否则框架初始化会失败。这里可以做一些重量级操作,因为只执行一次。

terminate() 终止处理

回测或实盘结束时调用,适合保存数据、清理资源。

def terminate(self):
    # 保存回测统计
    import json
    stats = {
        'total_trades': len(self.trades),
        'win_rate': self.win_rate
    }
    with open('stats.json', 'w') as f:
        json.dump(stats, f)
    
    self.log('策略执行结束')

before_terminate()terminate() 之前调用,区别是前者还能提交订单,后者只能做只读操作。比如实盘下线前,想在 before_terminate() 中平掉部分仓位,而在 terminate() 中记录最终状态。


高级策略模式让 Jesse 超越了简单的回测工具,成为真正的策略开发平台。多点进出管理解决了分批操作的复杂性,动态仓位更新赋予策略实时响应能力,事件处理提供了细粒度的执行控制,过滤器保持了代码的整洁,前后置钩子则完善了执行流程。掌握这些模式,就能将交易想法精确转化为可执行的策略逻辑。

下一章将深入技术指标的应用,看看如何在策略中高效使用这些分析工具。