优化用户模型和模块租户检查逻辑,新增NATS消息配置和MongoDB缓存控制
This commit is contained in:
65
nats/msg.go
Normal file
65
nats/msg.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/gogf/gf/v2/errors/gerror"
|
||||
)
|
||||
|
||||
// NatsMessageConfig nats Stream 消息配置
|
||||
type NatsMessageConfig struct {
|
||||
CreateTaskStreamName string
|
||||
CreateTaskSubjects []string
|
||||
PublishSubject string
|
||||
CreateTaskConsumerName string
|
||||
MsgCount int
|
||||
HandleFunc func(ctx context.Context, message map[string]interface{}) error
|
||||
}
|
||||
|
||||
// MessageConfig 消息配置接口
|
||||
type MessageConfig interface {
|
||||
createTaskStream(ctx context.Context) error
|
||||
publish(ctx context.Context, data interface{}) error
|
||||
createTaskConsumer(ctx context.Context) error
|
||||
//startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error
|
||||
}
|
||||
|
||||
func (n *NatsMessageConfig) createTaskStream(ctx context.Context) error {
|
||||
return createTaskStreamSimple(ctx, n.CreateTaskStreamName, n.CreateTaskSubjects)
|
||||
}
|
||||
|
||||
// CreateTaskStreamBatch 批量创建任务消息队列流
|
||||
func CreateTaskStreamBatch(ctx context.Context, configs ...MessageConfig) error {
|
||||
for _, cfg := range configs {
|
||||
if err := cfg.createTaskStream(ctx); err != nil {
|
||||
return gerror.Wrap(err, "创建任务消息队列流失败")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NatsMessageConfig) publish(ctx context.Context, data interface{}) error {
|
||||
return publish(ctx, n.PublishSubject, data)
|
||||
}
|
||||
|
||||
// PublishMessage 发布消息(统一入口)
|
||||
func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}) (err error) {
|
||||
return cfg.publish(ctx, data)
|
||||
}
|
||||
|
||||
func (n *NatsMessageConfig) createTaskConsumer(ctx context.Context) error {
|
||||
return CreateConsumerPushMode(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, n.PublishSubject, n.MsgCount)
|
||||
}
|
||||
|
||||
// CreateTaskConsumerBatch 批量创建任务消息队列消费者
|
||||
func CreateTaskConsumerBatch(ctx context.Context, configs ...MessageConfig) error {
|
||||
for _, cfg := range configs {
|
||||
if err := cfg.createTaskConsumer(ctx); err != nil {
|
||||
return gerror.Wrap(err, "创建任务消息队列流失败")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//func (n *NatsMessageConfig) startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error {
|
||||
// return ConsumeMessages(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, handleFunc)
|
||||
//}
|
||||
Reference in New Issue
Block a user