2026-01-29 13:57:50 +08:00
|
|
|
|
package message
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"encoding/json"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
2026-01-31 05:17:14 +08:00
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
|
|
"time"
|
2026-01-29 13:57:50 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
2026-01-31 05:17:14 +08:00
|
|
|
|
type NatsPublishMsgConfig struct {
|
2026-02-04 13:49:17 +08:00
|
|
|
|
QueueName string
|
|
|
|
|
|
Durable bool
|
|
|
|
|
|
Data any
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type NatsPublishDelayMsgConfig struct {
|
2026-01-31 05:17:14 +08:00
|
|
|
|
QueueName string
|
|
|
|
|
|
Durable bool
|
|
|
|
|
|
DelayTime int
|
|
|
|
|
|
Data any
|
2026-01-29 13:57:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-31 05:17:14 +08:00
|
|
|
|
type NatsSubscribeMsgConfig struct {
|
|
|
|
|
|
QueueName string
|
2026-02-04 13:49:17 +08:00
|
|
|
|
ConsumerName string
|
2026-01-31 05:17:14 +08:00
|
|
|
|
Durable bool
|
|
|
|
|
|
DelayTime int
|
|
|
|
|
|
AutoAck bool
|
|
|
|
|
|
PrefetchCount int
|
|
|
|
|
|
HandleFunc func(ctx context.Context, message map[string]interface{}) error
|
2026-01-29 13:57:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-31 05:17:14 +08:00
|
|
|
|
func (*NatsPublishMsgConfig) GetPublishMsgType() {
|
2026-01-29 13:57:50 +08:00
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
func (*NatsPublishDelayMsgConfig) GetPublishDelayMsgType() {
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-31 05:17:14 +08:00
|
|
|
|
func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() {
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
type natsMsg struct {
|
|
|
|
|
|
name string // 数据源名称
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-31 05:17:14 +08:00
|
|
|
|
func init() {
|
2026-02-04 13:49:17 +08:00
|
|
|
|
// 注册 Nats 插件(默认数据源)
|
|
|
|
|
|
RegisterPlugin(context.Background(), "default", MessageNATS, func() messageUtil {
|
|
|
|
|
|
return &natsMsg{name: "default"}
|
2026-01-31 05:17:14 +08:00
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
// Connect 连接 NATS
|
|
|
|
|
|
func (c *natsMsg) Connect(ctx context.Context) error {
|
|
|
|
|
|
return natsConnect(ctx, c.name)
|
2026-01-31 05:17:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
// Ping 检测 NATS 连接状态
|
|
|
|
|
|
func (c *natsMsg) Ping(ctx context.Context) bool {
|
|
|
|
|
|
return natsPing(ctx, c.name)
|
2026-01-31 05:17:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close 关闭 NATS 连接
|
2026-02-04 13:49:17 +08:00
|
|
|
|
func (c *natsMsg) Close(ctx context.Context) error {
|
|
|
|
|
|
return natsClose(ctx, c.name)
|
2026-01-31 05:17:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Publish 发布消息
|
|
|
|
|
|
func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
|
|
|
|
|
|
cfg, ok := msgConfig.(*NatsPublishMsgConfig)
|
2026-01-29 13:57:50 +08:00
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("无效的 NATS 配置类型")
|
|
|
|
|
|
}
|
2026-01-31 05:17:14 +08:00
|
|
|
|
if g.IsEmpty(cfg.QueueName) {
|
|
|
|
|
|
return fmt.Errorf("必须提供队列名称")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.Data) {
|
|
|
|
|
|
return fmt.Errorf("必须提供数据")
|
|
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// PublishDelay 发布延迟消息
|
|
|
|
|
|
func (c *natsMsg) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error {
|
|
|
|
|
|
cfg, ok := msgConfig.(*NatsPublishDelayMsgConfig)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("无效的 NATS 配置类型")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.QueueName) {
|
|
|
|
|
|
return fmt.Errorf("必须提供队列名称")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.DelayTime) {
|
|
|
|
|
|
return fmt.Errorf("延迟时间必须大于 0")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.Data) {
|
|
|
|
|
|
return fmt.Errorf("必须提供数据")
|
|
|
|
|
|
}
|
2026-01-31 05:17:14 +08:00
|
|
|
|
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Publish 发布消息
|
|
|
|
|
|
func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error {
|
|
|
|
|
|
delayMsg := delayTime > 0
|
2026-02-04 13:49:17 +08:00
|
|
|
|
if err := c.createStream(ctx, subject, durable, delayMsg); err != nil {
|
2026-01-29 14:23:50 +08:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
2026-01-29 13:57:50 +08:00
|
|
|
|
payload, err := json.Marshal(data)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("序列化数据失败: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
m := nats.NewMsg(subject)
|
|
|
|
|
|
m.Data = payload // 所有消息都需要设置数据
|
2026-01-31 05:17:14 +08:00
|
|
|
|
|
|
|
|
|
|
if delayMsg {
|
2026-02-04 13:49:17 +08:00
|
|
|
|
// 使用 @at 指定具体延迟时间,而不是 @every 重复执行
|
|
|
|
|
|
futureTime := time.Now().Add(time.Duration(delayTime) * time.Second).Format(time.RFC3339Nano)
|
|
|
|
|
|
m.Header.Set("Nats-Schedule", fmt.Sprintf("@at %s", futureTime))
|
|
|
|
|
|
m.Subject = subject + ".schedule"
|
|
|
|
|
|
m.Header.Set("Nats-Schedule-Target", subject)
|
|
|
|
|
|
g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%ds, Schedule=@at %s, Header=%s", delayTime, futureTime, m.Header)
|
2026-01-31 05:17:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-29 13:57:50 +08:00
|
|
|
|
// 发布消息到 JetStream
|
2026-02-04 13:49:17 +08:00
|
|
|
|
js := getNatsJS(c.name)
|
|
|
|
|
|
if js == nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name)
|
|
|
|
|
|
return fmt.Errorf("NATS JetStream 不存在")
|
2026-01-29 13:57:50 +08:00
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
ack, err := js.PublishMsg(m)
|
2026-01-31 05:17:14 +08:00
|
|
|
|
if err != nil {
|
2026-02-04 13:49:17 +08:00
|
|
|
|
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v, Subject=%s", err, m.Subject)
|
|
|
|
|
|
return err
|
2026-01-31 05:17:14 +08:00
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
g.Log().Infof(ctx, "✅ NATS 发布消息成功: Stream=%v, StreamSeq=%d", ack.Stream, ack.Sequence)
|
2026-01-29 13:57:50 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-31 05:17:14 +08:00
|
|
|
|
// Subscribe 订阅消息
|
|
|
|
|
|
func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
|
|
|
|
|
|
cfg, ok := msgConfig.(*NatsSubscribeMsgConfig)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("无效的 NATS 配置类型")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.QueueName) {
|
|
|
|
|
|
return fmt.Errorf("必须提供队列名称")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.ConsumerName) {
|
|
|
|
|
|
return fmt.Errorf("必须提供消费者名称")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.HandleFunc) {
|
|
|
|
|
|
return fmt.Errorf("必须提供处理函数")
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(cfg.PrefetchCount) {
|
|
|
|
|
|
cfg.PrefetchCount = 1
|
|
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.DelayTime, cfg.AutoAck, cfg.Durable, cfg.HandleFunc)
|
2026-01-29 13:57:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-31 05:17:14 +08:00
|
|
|
|
// createSubscribe 内部订阅消息
|
2026-02-04 13:49:17 +08:00
|
|
|
|
func (c *natsMsg) createSubscribe(ctx context.Context, subject, consumerName string, prefetchCount, delayTime int, autoAck, durable bool, handler func(ctx context.Context, message map[string]any) error) error {
|
2026-01-31 05:17:14 +08:00
|
|
|
|
g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName)
|
2026-02-04 13:49:17 +08:00
|
|
|
|
// 创建推送订阅的回调函数
|
|
|
|
|
|
msgHandler := func(msg *nats.Msg) {
|
2026-01-29 13:57:50 +08:00
|
|
|
|
var data map[string]any
|
2026-02-04 13:49:17 +08:00
|
|
|
|
if err := json.Unmarshal(msg.Data, &data); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ 解析消息失败: %v", err)
|
2026-01-29 13:57:50 +08:00
|
|
|
|
return
|
|
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
g.Log().Infof(ctx, "📨 收到消息: Subject=%s, Data=%v", msg.Subject, data)
|
|
|
|
|
|
|
2026-01-29 13:57:50 +08:00
|
|
|
|
// 处理业务逻辑
|
|
|
|
|
|
if err := handler(ctx, data); err != nil {
|
2026-02-04 13:49:17 +08:00
|
|
|
|
g.Log().Errorf(ctx, "❌ 处理消息失败: %v", err)
|
|
|
|
|
|
if !autoAck {
|
|
|
|
|
|
if err := msg.Nak(); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ Nak 失败: %v", err)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
return
|
2026-01-31 05:17:14 +08:00
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
g.Log().Infof(ctx, "✅ 处理消息成功")
|
2026-01-29 13:57:50 +08:00
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
if err := msg.Ack(); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ Ack 失败: %v", err)
|
2026-01-29 13:57:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
delayMsg := delayTime > 0
|
|
|
|
|
|
// 创建流
|
|
|
|
|
|
if err := c.createStream(ctx, subject, durable, delayMsg); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
// 获取 JetStream 上下文
|
|
|
|
|
|
js := getNatsJS(c.name)
|
|
|
|
|
|
if js == nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name)
|
|
|
|
|
|
return fmt.Errorf("NATS JetStream 不存在")
|
|
|
|
|
|
}
|
|
|
|
|
|
// 创建推送订阅
|
|
|
|
|
|
var sub *nats.Subscription
|
|
|
|
|
|
var err error
|
|
|
|
|
|
// 配置订阅选项 - 使用 DeliverSubject 创建 Push Consumer
|
|
|
|
|
|
subOpts := []nats.SubOpt{
|
|
|
|
|
|
nats.Durable(consumerName),
|
|
|
|
|
|
nats.MaxAckPending(prefetchCount),
|
|
|
|
|
|
nats.DeliverSubject(consumerName),
|
|
|
|
|
|
}
|
|
|
|
|
|
if !autoAck {
|
|
|
|
|
|
subOpts = append(subOpts, nats.ManualAck())
|
|
|
|
|
|
}
|
|
|
|
|
|
// 使用 Subscribe 创建推送订阅
|
|
|
|
|
|
sub, err = js.Subscribe(subject, msgHandler, subOpts...)
|
2026-01-29 13:57:50 +08:00
|
|
|
|
if err != nil {
|
2026-02-04 13:49:17 +08:00
|
|
|
|
g.Log().Errorf(ctx, "创建推送订阅失败: %v", err)
|
|
|
|
|
|
return err
|
2026-01-29 13:57:50 +08:00
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
g.Log().Infof(ctx, "✅ NATS 推送订阅成功: Consumer=%s", consumerName)
|
|
|
|
|
|
// 启动后台 goroutine 监听上下文取消,用于清理订阅
|
|
|
|
|
|
go func() {
|
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
|
g.Log().Infof(ctx, "订阅上下文取消,取消订阅")
|
|
|
|
|
|
if err := sub.Unsubscribe(); err != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
2026-01-29 13:57:50 +08:00
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2026-01-29 13:57:50 +08:00
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
// createStream 内部创建消费组
|
|
|
|
|
|
func (c *natsMsg) createStream(ctx context.Context, subject string, durable, delayMsg bool) error {
|
|
|
|
|
|
streamName, storage := getStreamInfo(durable, delayMsg)
|
|
|
|
|
|
// 构建流配置
|
|
|
|
|
|
// 如果是延迟消息,需要包含两个 subjects:
|
|
|
|
|
|
// 1. subject.schedule - 用于发送调度消息
|
|
|
|
|
|
// 2. subject - 用于实际投递目标
|
|
|
|
|
|
subjects := []string{subject}
|
|
|
|
|
|
if delayMsg {
|
|
|
|
|
|
subjects = []string{subject, subject + ".schedule"}
|
|
|
|
|
|
}
|
|
|
|
|
|
jsConfig := &StreamConfig{
|
|
|
|
|
|
Name: streamName,
|
|
|
|
|
|
Subjects: subjects,
|
|
|
|
|
|
AllowMsgSchedules: delayMsg, // 延迟消息核心开关
|
|
|
|
|
|
Storage: storage,
|
|
|
|
|
|
Discard: DiscardNew, // 达到上限删除旧消息
|
|
|
|
|
|
}
|
|
|
|
|
|
nc := getNatsConn(c.name)
|
|
|
|
|
|
if !c.Ping(ctx) {
|
|
|
|
|
|
// 使用统一的重连函数
|
|
|
|
|
|
if err := commonConnect(ctx, MessageNATS, c.name, func(ctx context.Context) error {
|
|
|
|
|
|
return c.Connect(ctx)
|
|
|
|
|
|
}, func(ctx context.Context) error {
|
|
|
|
|
|
return c.Close(ctx)
|
|
|
|
|
|
}); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageNATS, c.name, err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if nc == nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] 连接不存在", c.name)
|
|
|
|
|
|
return fmt.Errorf("NATS 连接不存在")
|
|
|
|
|
|
}
|
|
|
|
|
|
err := jsStreamCreate(nc, jsConfig)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "❌ 创建 Stream 失败: err=%v", err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
g.Log().Infof(ctx, "✅ 创建 Stream 成功: stream=%s, subjects=%v, allowSchedules=%v", streamName, subjects, delayMsg)
|
2026-01-29 13:57:50 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2026-01-31 05:17:14 +08:00
|
|
|
|
|
2026-02-04 13:49:17 +08:00
|
|
|
|
func getStreamInfo(durable, delayMsg bool) (string, StorageType) {
|
2026-01-31 05:17:14 +08:00
|
|
|
|
// Stream 不存在,创建新的
|
|
|
|
|
|
streamName := "ordinary_msg_memory"
|
2026-02-04 13:49:17 +08:00
|
|
|
|
storage := MemoryStorage
|
2026-01-31 05:17:14 +08:00
|
|
|
|
|
|
|
|
|
|
// 延迟消息必须使用 FileStorage(NATS 官方要求)
|
|
|
|
|
|
if delayMsg {
|
2026-02-04 13:49:17 +08:00
|
|
|
|
if durable {
|
|
|
|
|
|
streamName = "delay_msg_file"
|
|
|
|
|
|
storage = FileStorage
|
|
|
|
|
|
} else {
|
|
|
|
|
|
streamName = "delay_msg_memory"
|
|
|
|
|
|
storage = MemoryStorage
|
|
|
|
|
|
}
|
2026-01-31 05:17:14 +08:00
|
|
|
|
} else {
|
|
|
|
|
|
if durable {
|
|
|
|
|
|
streamName = "ordinary_msg_file"
|
2026-02-04 13:49:17 +08:00
|
|
|
|
storage = FileStorage
|
2026-01-31 05:17:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return streamName, storage
|
|
|
|
|
|
}
|
2026-02-04 13:49:17 +08:00
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
// JSApiStreamCreateT is the endpoint to create new streams.
|
|
|
|
|
|
// Will return JSON response.
|
|
|
|
|
|
JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
|
|
|
|
|
|
|
|
|
|
|
|
// JSApiStreamUpdateT is the endpoint to update existing streams.
|
|
|
|
|
|
// Will return JSON response.
|
|
|
|
|
|
JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
|
|
|
|
|
|
func jsStreamCreate(nc *nats.Conn, cfg *StreamConfig) error {
|
|
|
|
|
|
j, err := json.Marshal(cfg)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second*3)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查 API 响应中的错误
|
|
|
|
|
|
var resp struct {
|
|
|
|
|
|
Error *struct {
|
|
|
|
|
|
Code int `json:"code"`
|
|
|
|
|
|
ErrCode int `json:"err_code"`
|
|
|
|
|
|
Description string `json:"description"`
|
|
|
|
|
|
} `json:"error,omitempty"`
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := json.Unmarshal(msg.Data, &resp); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
if resp.Error != nil {
|
|
|
|
|
|
// 如果 Stream 已存在,尝试更新
|
|
|
|
|
|
if resp.Error.ErrCode == 10058 { // JSStreamNameExistErr
|
|
|
|
|
|
return jsStreamUpdate(nc, cfg)
|
|
|
|
|
|
}
|
|
|
|
|
|
return fmt.Errorf("JS API error: %s", resp.Error.Description)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
|
|
|
|
|
|
func jsStreamUpdate(nc *nats.Conn, cfg *StreamConfig) error {
|
|
|
|
|
|
j, err := json.Marshal(cfg)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), j, time.Second*3)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查 API 响应中的错误
|
|
|
|
|
|
var resp struct {
|
|
|
|
|
|
Error *struct {
|
|
|
|
|
|
Code int `json:"code"`
|
|
|
|
|
|
ErrCode int `json:"err_code"`
|
|
|
|
|
|
Description string `json:"description"`
|
|
|
|
|
|
} `json:"error,omitempty"`
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := json.Unmarshal(msg.Data, &resp); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
if resp.Error != nil {
|
|
|
|
|
|
return fmt.Errorf("JS API error: %s", resp.Error.Description)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|