Files
common/nats/nats_consumer.go

295 lines
9.5 KiB
Go
Raw Normal View History

package nats
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go/jetstream"
)
// AckPolicy 确认策略
type AckPolicy string
const (
AckPolicyExplicit AckPolicy = "explicit" // 显式确认(默认)
AckPolicyAll AckPolicy = "all" // 确认所有消息
AckPolicyNone AckPolicy = "none" // 不需要确认
)
// DeliverPolicy 投递策略
type DeliverPolicy string
const (
DeliverPolicyAll DeliverPolicy = "all" // 投递所有消息(包括已投递的)
DeliverPolicyLast DeliverPolicy = "last" // 从最后一条消息开始
DeliverPolicyNew DeliverPolicy = "new" // 仅投递新消息(默认)
DeliverPolicyLastPerSubj DeliverPolicy = "lastpersubj" // 每个主题的最后一条
DeliverPolicyByStartSeq DeliverPolicy = "by_start_sequence" // 按起始序列号
)
// ReplayPolicy 重放策略
type ReplayPolicy string
const (
ReplayPolicyInstant ReplayPolicy = "instant" // 立即重放
ReplayPolicyOriginal ReplayPolicy = "original" // 按原始顺序重放
)
// ConsumerConfig 消费者配置
type ConsumerConfig struct {
DurableName string // 持久化名称(空表示临时消费者)
Description string // 描述信息
AckPolicy AckPolicy // 确认策略
AckWait int // 确认等待时间(秒)
MaxDeliver int // 最大投递次数
FilterSubject string // 过滤主题(流内多主题时使用)
DeliverPolicy DeliverPolicy // 投递策略
ReplayPolicy ReplayPolicy // 重放策略
MaxWaiting int // 最大等待消息数
MaxAckPending int // 最大待确认消息数
OptStartTime int64 // 起始时间戳
OptStartSeq uint64 // 起始序列号
HeadersOnly bool // 仅消费消息头
Backoff []int // 退避策略(秒数数组)
RateLimit uint64 // 消息速率限制(消息/秒)
Replica int // 副本数
FlowControl bool // 启用流控
Metadata map[string]string // 元数据
}
// parseAckPolicy 解析确认策略
func parseAckPolicy(policy AckPolicy) jetstream.AckPolicy {
switch policy {
case AckPolicyAll:
return jetstream.AckAllPolicy
case AckPolicyNone:
return jetstream.AckNonePolicy
default:
return jetstream.AckExplicitPolicy
}
}
// parseDeliverPolicy 解析投递策略
func parseDeliverPolicy(policy DeliverPolicy) jetstream.DeliverPolicy {
switch policy {
case DeliverPolicyAll:
return jetstream.DeliverAllPolicy
case DeliverPolicyLast:
return jetstream.DeliverLastPolicy
case DeliverPolicyLastPerSubj:
return jetstream.DeliverLastPerSubjectPolicy
case DeliverPolicyByStartSeq:
return jetstream.DeliverByStartSequencePolicy
default:
return jetstream.DeliverNewPolicy
}
}
// parseReplayPolicy 解析重放策略
func parseReplayPolicy(policy ReplayPolicy) jetstream.ReplayPolicy {
switch policy {
case ReplayPolicyOriginal:
return jetstream.ReplayOriginalPolicy
default:
return jetstream.ReplayInstantPolicy
}
}
// CreateTaskConsumer 创建任务消费者
// 核心设计思路:
// 1. 显式确认:确保消息被正确处理后才确认
// 2. 重试机制:通过 MaxDeliver 控制最大重试次数
// 3. 持久化DurableName 确保消费者状态持久化
// 4. 流控:防止消费者过载
func CreateTaskConsumer(ctx context.Context, streamName string, config ConsumerConfig) (jetstream.Consumer, error) {
if !IsConnected() {
return nil, fmt.Errorf("NATS 未连接")
}
if streamName == "" {
return nil, fmt.Errorf("流名称不能为空")
}
// 设置默认值
if config.AckPolicy == "" {
config.AckPolicy = AckPolicyExplicit // 默认显式确认
}
if config.AckWait == 0 {
config.AckWait = 30 // 默认30秒确认超时
}
if config.MaxDeliver == 0 {
config.MaxDeliver = 3 // 默认最多投递3次
}
if config.DeliverPolicy == "" {
config.DeliverPolicy = DeliverPolicyNew // 默认仅消费新消息
}
if config.ReplayPolicy == "" {
config.ReplayPolicy = ReplayPolicyInstant // 默认立即重放
}
if config.MaxAckPending == 0 {
config.MaxAckPending = 1000 // 默认最多1000条待确认消息
}
// 构建消费者配置
jsConfig := jetstream.ConsumerConfig{
Name: config.DurableName,
Description: config.Description,
AckPolicy: parseAckPolicy(config.AckPolicy),
AckWait: 0,
MaxDeliver: config.MaxDeliver,
FilterSubjects: []string{config.FilterSubject},
DeliverPolicy: parseDeliverPolicy(config.DeliverPolicy),
ReplayPolicy: parseReplayPolicy(config.ReplayPolicy),
MaxWaiting: config.MaxWaiting,
MaxAckPending: config.MaxAckPending,
HeadersOnly: config.HeadersOnly,
RateLimit: config.RateLimit,
Replicas: config.Replica,
Metadata: config.Metadata,
}
// 配置流控和心跳
if config.FlowControl {
jsConfig.FlowControl = true
}
// 配置起始位置
if config.OptStartSeq > 0 {
jsConfig.OptStartSeq = config.OptStartSeq
}
// 创建新消费者
consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig)
if err != nil {
return nil, fmt.Errorf("创建消费者失败: %w", err)
}
// 记录配置信息
configInfo := fmt.Sprintf("确认策略=%s, 最大投递=%d, 投递策略=%s", config.AckPolicy, config.MaxDeliver, config.DeliverPolicy)
if config.FilterSubject != "" {
configInfo += fmt.Sprintf(", 过滤主题=%s", config.FilterSubject)
}
g.Log().Infof(ctx, "✅ 任务消费者创建成功: %s/%s (%s)", streamName, config.DurableName, configInfo)
return consumer, nil
}
// CreateConsumerSimple 简化版创建消费者(适用于大多数场景)
// 只需提供流名称和消费者名称,其他使用默认配置
func CreateConsumerSimple(ctx context.Context, streamName, durableName string) (err error) {
_, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{
DurableName: durableName,
})
return
}
// CreateConsumerWithFilter 创建带主题过滤的消费者
//func CreateConsumerWithFilter(ctx context.Context, streamName, durableName, filterSubject string) (jetstream.Consumer, error) {
// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{
// DurableName: durableName,
// FilterSubject: filterSubject,
// })
//}
// CreateConsumerEphemeral 创建临时消费者
// 临时消费者没有持久化名称,连接断开后自动删除
//func CreateConsumerEphemeral(ctx context.Context, streamName string) (jetstream.Consumer, error) {
// if !IsConnected() {
// return nil, fmt.Errorf("NATS 未连接")
// }
//
// jsConfig := jetstream.ConsumerConfig{
// AckPolicy: jetstream.AckNonePolicy,
// AckWait: 0,
// MaxDeliver: 3,
// DeliverPolicy: jetstream.DeliverNewPolicy,
// ReplayPolicy: jetstream.ReplayInstantPolicy,
// MaxAckPending: 1000,
// }
//
// consumer, err := js.CreateConsumer(ctx, streamName, jsConfig)
// if err != nil {
// return nil, fmt.Errorf("创建临时消费者失败: %w", err)
// }
//
// g.Log().Infof(ctx, "✅ 临时消费者创建成功: %s", streamName)
// return consumer, nil
//}
// CreateConsumerPushMode 创建推送模式消费者
// 推送模式下NATS 服务器主动将消息推送给消费者
func CreateConsumerPushMode(ctx context.Context, streamName, durableName, subject string, msgCount int) (err error) {
_, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{
DurableName: durableName,
FilterSubject: subject,
MaxAckPending: msgCount,
})
return
}
// CreateConsumerPullMode 创建拉取模式消费者
// 拉取模式下,消费者主动从服务器拉取消息
//func CreateConsumerPullMode(ctx context.Context, streamName, durableName string) (jetstream.Consumer, error) {
// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{
// DurableName: durableName,
// DeliverPolicy: DeliverPolicyAll,
// MaxAckPending: 500, // 拉取模式下待确认消息数可以设置小一些
// })
//}
// ConsumeMessages 消费消息(推送模式)
func ConsumeMessages(ctx context.Context, streamName, consumerName string, handler jetstream.MessageHandler) error {
if !IsConnected() {
return fmt.Errorf("NATS 未连接")
}
// 获取消费者
consumer, err := js.Consumer(ctx, streamName, consumerName)
if err != nil {
return fmt.Errorf("获取消费者失败: %w", err)
}
// 业务处理
//if err := handler(ctx, streamMsg.Values); err != nil {
// glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
// continue
//}
//// 确认消息
//if msg.AutoAck {
// err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID)
// if err != nil {
// glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err)
// }
//}
//// 创建消息处理函数
//handler = func(msg jetstream.Msg) {
// // 解析消息
// var task TaskMessage
// if err := json.Unmarshal(msg.Data(), &task); err != nil {
// g.Log().Errorf(ctx, "解析消息失败: %v", err)
// msg.Nak() // 拒绝消息,触发重试
// return
// }
//
// // 处理业务逻辑
// g.Log().Infof(ctx, "处理任务: %s", task.TaskID)
//
// // 处理成功,确认消息
// msg.Ack()
//}
// 开始消费
_, err = consumer.Consume(handler)
if err != nil {
return fmt.Errorf("开始消费失败: %w", err)
}
g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", streamName, consumerName)
return nil
}
// 定义消息结构
type TaskMessage struct {
TaskID string `json:"task_id"`
TaskType string `json:"task_type"`
Data string `json:"data"`
}