6.高级策略模式
在掌握了基础的交易进出逻辑后,策略开发往往会遇到更复杂的需求。比如,想在不同价位分批建仓,或者根据市场变化动态调整止盈止损,甚至让多个交易对之间产生联动。Jesse 的设计哲学是"简单事情简单做,复杂事情可行",这些高级模式正是为了解决实际交易中的精细控制需求。
多点进出管理
传统策略通常是一次性开仓、一次性平仓。但真实市场往往需要更灵活的操作:突破时先建半仓,回调再加半仓;或者盈利时分批止盈,让利润奔跑的同时锁定部分收益。
分批出场
在 go_long() 或 go_short() 中,self.take_profit 和 self.stop_loss 支持列表语法。每个元素是一个元组,包含(数量,价格)。Jesse 会自动处理这些订单,当价格触发时执行对应的平仓操作。
def go_long(self):
qty = 2
self.buy = qty, self.price
# 在 120 和 140 两个价位分批止盈
self.take_profit = [
(qty / 2, 120), # 第一单:平掉 1 个仓位
(qty / 2, 140) # 第二单:平掉剩余 1 个仓位
]
self.stop_loss = qty, 80 # 全部仓位统一止损
这段代码展示了最经典的分批止盈策略。入场后,系统会同时挂出两个止盈单和一个止损单。当价格达到 120 时,自动平仓一半;达到 140 时,平仓另一半。止损单始终保护全部仓位。这种设计让策略逻辑清晰,不需要手动跟踪哪个订单已执行。
分批入场
同样原理适用于分批建仓。通过列表语法,可以在不同价位逐步建立头寸。
def go_long(self):
# 计划分两次买入,每次 1 个单位
self.buy = [
(1, 120), # 第一笔:价格达到 120 时买入
(1, 140) # 第二笔:价格达到 140 时再买入
]
self.stop_loss = 2, 100 # 全部仓位统一止损
self.take_profit = 2, 160
这种模式下,Jesse 会先提交第一个限价单。当 120 的订单成交后,系统会自动提交第二个 140 的订单。如果价格直接上涨,可能只成交第一笔;如果回调,两笔都可能成交。reduced_count 属性在这里派上用场,它记录了仓位被减少的次数。
def go_long(self):
self.buy = 2, 100
self.take_profit = [
(1, 120),
(1, 140)
]
def update_position(self):
# 当第一笔止盈成交后,调整剩余仓位的止损到成本价
if self.reduced_count > 0:
self.stop_loss = 1, 100
reduced_count 从 0 开始,每次止盈或止损成交导致仓位减少时,这个值就会增加。利用它,可以实现"移动止损到保本"的经典风控手法。第一笔止盈后,将剩余仓位的止损价调整到入场价,确保后续波动不会亏损。
动态调整未成交订单
有时入场后,市场结构发生变化,需要取消之前的挂单。should_cancel_entry() 方法专门处理这个场景。
def should_long(self):
return True
def go_long(self):
# 在最高价上方 2 个点挂突破单
self.buy = 1, self.high + 2
def should_cancel_entry(self):
# 如果新蜡烛形成后,价格反而下跌,取消旧订单
return self.close < self.open
这个例子中,策略试图捕捉向上突破。但如果市场未能延续强势,反而收阴线,说明突破可能失败。此时返回 True 取消挂单的止损单,避免在弱势中追高。注意,这个方法只影响入场订单,不影响已成交仓位后的止盈止损。
动态仓位更新
市场瞬息万变,预先设定的止盈止损有时需要调整。update_position() 方法在每次新蜡烛到来时,只要仓位是打开的,就会被调用。这是实现移动止损、动态止盈、加仓减仓的核心机制。
移动止盈止损
趋势跟踪策略的典型做法是让利润奔跑,同时用移动止损保护收益。
def update_position(self):
qty = self.position.qty
# 多头仓位:止盈设在当前蜡烛高点下方 10 个点
if self.is_long:
self.take_profit = qty, self.high - 10
# 空头仓位:止盈设在当前蜡烛低点上方 10 个点
elif self.is_short:
self.take_profit = qty, self.low + 10
每根新蜡烛都会重新计算止盈价。对于多头,如果价格不断创出新高,止盈价也会跟随上移;如果价格回落,止盈价保持不变(因为 self.high 是新的高点)。这样就实现了"只上移不下移"的移动止盈逻辑。
条件平仓
有时需要根据指标信号主动平仓,而不是等待价格触及预设点位。
def update_position(self):
# 多头仓位且 RSI 达到超买区,立即平仓
if self.is_long and ta.rsi(self.candles) > 80:
self.liquidate()
liquidate() 是平仓的快捷方法,它会以市价单平掉所有仓位,并自动取消相关订单。这种用法在震荡策略中很常见:突破入场,但指标显示超买超卖时提前离场,不等待止损触发。
浮盈加仓
金字塔加仓是经典手法,但风险较高。Jesse 允许在 update_position() 中继续下单。
def update_position(self):
# 多头仓位盈利超过 5%,且 RSI 显示超卖,加仓一倍
if self.is_long and self.position.pnl_percentage > 5:
if ta.rsi(self.candles) < 30:
self.buy = self.position.qty, self.price
这里在原有仓位盈利的基础上,遇到回调(RSI 超卖)时加倍仓位。self.position.qty 获取当前仓位数量,买入相同数量就实现了仓位翻倍。注意,加仓后需要重新设置止损止盈,否则新仓位没有保护。
仓位事件处理
事件驱动是 Jesse 的一大特色。当仓位状态发生变化时,系统会自动调用对应的事件方法。这让策略能精确响应市场执行,而不是被动等待下一根蜡烛。
基础事件
on_open_position 在入场订单成交瞬间触发,此时 self.position 已经可用。
def on_open_position(self, order):
# 记录开仓信息
self.log(f"仓位已开,入场价: {self.position.entry_price}, 数量: {self.position.qty}")
on_close_position 在仓位完全平仓时触发。通过 order 参数可以判断是止盈还是止损。
def on_close_position(self, order):
if order.is_take_profit:
self.log("止盈平仓")
elif order.is_stop_loss:
self.log("止损平仓")
on_reduced_position 在仓位部分平仓时触发,比如分批止盈成交了一半。
def go_long(self):
self.buy = 2, 100
self.take_profit = [
(1, 120),
(1, 140)
]
def on_reduced_position(self, order):
# 第一笔止盈成交后,将止损移动到成本价
self.stop_loss = 1, 100
on_increased_position 在加仓时触发,无论是 go_long() 中的分批入场,还是 update_position() 中的动态加仓。
def on_increased_position(self, order):
# 加仓后,重新计算总仓位的止损
total_qty = self.position.qty
self.stop_loss = total_qty, self.position.entry_price * 0.95
跨策略通信事件
多品种策略需要协同操作时,Jesse 提供了路由事件。假设同时交易 BTC 和 ETH,当 ETH 开仓时,想平掉 BTC 的仓位。
def on_route_open_position(self, strategy):
# 当 ETH-USDT 策略开仓时,平掉当前策略的仓位
if self.is_open and strategy.symbol == 'ETH-USDT':
self.liquidate()
类似的事件还有 on_route_close_position、on_route_increased_position、on_route_reduced_position、on_route_canceled。这些事件接收 strategy 参数,可以访问其他策略的符号、仓位等信息。
跨策略通信需要配合 self.shared_vars 共享数据。比如配对交易策略,一个策略做多 A,另一个做空 B,通过共享变量传递信号。
def go_long(self):
# 开仓时标记信号
self.shared_vars['signal'] = 'long'
def on_route_open_position(self, strategy):
# 另一个策略读取信号并反向操作
if self.shared_vars.get('signal') == 'long':
self.sell = 1, self.price
策略过滤器
过滤器是 Jesse 的独特设计,用于将入场条件与过滤条件分离。当策略需要多个条件同时满足,但又不想在 should_long() 里写复杂判断时,过滤器是最佳选择。
基本用法
首先定义 filters() 方法,返回一个过滤器函数列表。然后编写过滤器函数,函数名随意,建议包含 filter。
def filters(self):
return [
self.trend_filter,
self.volatility_filter
]
def trend_filter(self):
# 只在上升趋势中交易
return self.close > ta.ema(self.candles, 50)
def volatility_filter(self):
# 波动率不能太大
return ta.atr(self.candles, 14) < self.close * 0.02
关键点是 filters() 返回的是函数对象,不是调用结果。所以不能加括号,这是新手常犯的错误。
错误写法:
def filters(self):
return [self.trend_filter()] # 这会立即执行,返回布尔值
正确写法:
def filters(self):
return [self.trend_filter] # 返回函数本身,让框架在适当时机调用
过滤器的优势
过滤器能访问入场和出场价格,这是 should_long() 做不到的。因为 should_long() 只决定"是否交易",而具体价格是在 go_long() 中定义的。过滤器在两者之后执行,可以拿到最终计算好的价格。
def minimum_pnl_filter(self):
# 确保每笔交易的预期盈利大于 1%
reward = abs(self.average_take_profit - self.average_entry_price)
pnl_percentage = (reward / self.average_entry_price) * 100
return pnl_percentage > 1
self.average_entry_price 和 self.average_take_profit 是 Jesse 自动计算的平均值,考虑了分批入场和出场的情况。如果策略的盈亏比不达标,即使 should_long() 返回 True,也不会开仓。
另一个优势是调试方便。当过滤器阻止交易时,Jesse 会在日志中记录哪个过滤器未通过,这比在 should_long() 里调试复杂的 if-else 要直观得多。
前后置钩子
before() 和 after() 是策略执行流程的两端,分别在每个蜡烛开始时和结束后调用。它们不直接参与交易决策,而是用于状态管理和数据准备。
before() 前置处理
在新蜡烛数据刚到达,但策略逻辑尚未执行时调用。适合更新自定义变量、预计算指标等。
def before(self):
# 记录当前蜡烛的开盘时间
self.vars['candle_start'] = self.time
# 预计算一些复杂指标,供后续方法使用
self.vars['rsi'] = ta.rsi(self.candles, 14)
self.vars['ema_fast'] = ta.ema(self.candles, 20)
self.vars['ema_slow'] = ta.ema(self.candles, 50)
self.vars 是策略实例的私有字典,生命周期与策略相同。与 self.shared_vars 不同,它不能在策略间共享,适合存放当前策略的状态。
after() 后置处理
在策略所有逻辑执行完毕后调用,包括 update_position() 和事件处理。适合清理状态、记录日志等。
def after(self):
# 检查是否有未处理的订单
if self.orders:
self.log(f"活跃订单数: {len(self.orders)}")
# 更新自定义计数器
if self.is_open:
self.vars['bars_since_entry'] = self.vars.get('bars_since_entry', 0) + 1
else:
self.vars['bars_since_entry'] = 0
这个例子统计了持仓的蜡烛数量。self.orders 返回当前所有活跃订单,可以用来监控挂单状态。
完整执行流程
理解这些钩子的调用顺序对排查问题至关重要。Jesse 的策略执行流程如下:
graph TD
A[新蜡烛到达] --> B[before()]
B --> C{仓位是否打开?}
C -->|否| D[should_long()/should_short()]
D --> E[should_cancel_entry()]
C -->|是| F[update_position()]
F --> G[事件处理]
E --> H[go_long()/go_short()]
H --> I[提交订单]
G --> J[after()]
I --> J
before() 最先执行,after() 最后执行。无论是否有仓位,这两个方法都会被调用。而 should_long()、update_position() 等则根据仓位状态选择性执行。
生命周期管理
策略还有开始和结束的特殊方法,虽然不常用,但在某些场景下很关键。
init() 初始化
策略类实例化时调用一次,适合加载模型、初始化参数。
def __init__(self):
super().__init__()
# 加载机器学习模型
import pickle
with open('model.pkl', 'rb') as f:
self.model = pickle.load(f)
# 初始化自定义变量
self.vars['trade_count'] = 0
必须首先调用 super().__init__(),否则框架初始化会失败。这里可以做一些重量级操作,因为只执行一次。
terminate() 终止处理
回测或实盘结束时调用,适合保存数据、清理资源。
def terminate(self):
# 保存回测统计
import json
stats = {
'total_trades': len(self.trades),
'win_rate': self.win_rate
}
with open('stats.json', 'w') as f:
json.dump(stats, f)
self.log('策略执行结束')
before_terminate() 在 terminate() 之前调用,区别是前者还能提交订单,后者只能做只读操作。比如实盘下线前,想在 before_terminate() 中平掉部分仓位,而在 terminate() 中记录最终状态。
高级策略模式让 Jesse 超越了简单的回测工具,成为真正的策略开发平台。多点进出管理解决了分批操作的复杂性,动态仓位更新赋予策略实时响应能力,事件处理提供了细粒度的执行控制,过滤器保持了代码的整洁,前后置钩子则完善了执行流程。掌握这些模式,就能将交易想法精确转化为可执行的策略逻辑。
下一章将深入技术指标的应用,看看如何在策略中高效使用这些分析工具。