package nats import ( "context" "encoding/json" "fmt" "reflect" "sync" "time" "github.com/gogf/gf/v2/frame/g" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) // RPC 服务注册表 var ( rpcServices map[string]RPCHandler rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅 rpcServicesMu sync.RWMutex queueRPCServices map[string]map[string]RPCHandler // queueName -> subject -> handler queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅 queueRPCMu sync.RWMutex ) // RPCHandler RPC 处理函数类型 // 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现 type RPCHandler func(ctx context.Context, req []byte) ([]byte, error) // CreateTaskStream 创建任务消息队列流 // 存储策略: 文件存储 // 工作队列模式: 工作队列策略 func CreateTaskStream(ctx context.Context, streamName string, subjects []string) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } stream, err := js.Stream(ctx, streamName) if err == nil { // 流已存在,更新配置 _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, }) if err != nil { return fmt.Errorf("更新任务流失败: %w", err) } g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name) return nil } // 创建新流 stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, }) if err != nil { return fmt.Errorf("创建任务流失败: %w", err) } g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (文件存储+工作队列策略)", stream.CachedInfo().Config.Name) return nil } // CreateLogStream 创建日志流 // 存储策略: 内存存储 // 副本数: 单副本 (1) // 消息留存: 短时留存 (1小时) func CreateLogStream(ctx context.Context, streamName string, subjects []string) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } maxAge := 1 * time.Hour stream, err := js.Stream(ctx, streamName) if err == nil { // 流已存在,更新配置 _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.MemoryStorage, Replicas: 1, MaxAge: maxAge, }) if err != nil { return fmt.Errorf("更新日志流失败: %w", err) } g.Log().Infof(ctx, "✅ 日志流已更新: %s", stream.CachedInfo().Config.Name) return nil } // 创建新流 stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.MemoryStorage, Replicas: 1, MaxAge: maxAge, }) if err != nil { return fmt.Errorf("创建日志流失败: %w", err) } g.Log().Infof(ctx, "✅ 日志流创建成功: %s (内存存储+单副本+短时留存1小时)", stream.CachedInfo().Config.Name) return nil } // CreateTradeStream 创建交易业务流 // 存储策略: 文件存储 // 副本数: 3副本 // 同步刷盘: 启用 func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } stream, err := js.Stream(ctx, streamName) if err == nil { // 流已存在,更新配置 _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.FileStorage, Replicas: 3, RePublish: nil, Duplicates: 0, }) if err != nil { return fmt.Errorf("更新交易流失败: %w", err) } g.Log().Infof(ctx, "✅ 交易业务流已更新: %s", stream.CachedInfo().Config.Name) return nil } // 创建新流 stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.FileStorage, Replicas: 3, RePublish: nil, Duplicates: 0, }) if err != nil { return fmt.Errorf("创建交易流失败: %w", err) } g.Log().Infof(ctx, "✅ 交易业务流创建成功: %s (文件存储+3副本+同步刷盘)", stream.CachedInfo().Config.Name) return nil } // Publish 发布消息到指定主题 func Publish(ctx context.Context, subject string, data []byte) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } metrics.PublishCount.Add(1) _, err := js.Publish(ctx, subject, data) if err != nil { metrics.PublishError.Add(1) return fmt.Errorf("发布消息失败: %w", err) } return nil } // GetStream 获取流信息 func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) { if !checkConnected() { return nil, fmt.Errorf("NATS 未连接") } stream, err := js.Stream(ctx, streamName) if err != nil { return nil, fmt.Errorf("获取流失败: %w", err) } info, err := stream.Info(ctx) if err != nil { return nil, fmt.Errorf("获取流信息失败: %w", err) } return info, nil } // ListStreams 列出所有流(简化实现) // 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 func ListStreams(ctx context.Context) ([]string, error) { if !checkConnected() { return nil, fmt.Errorf("NATS 未连接") } // TODO: 根据实际 NATS 版本实现完整的流列表功能 return []string{}, nil } // DeleteStream 删除流 func DeleteStream(ctx context.Context, streamName string) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } if err := js.DeleteStream(ctx, streamName); err != nil { return fmt.Errorf("删除流失败: %w", err) } g.Log().Infof(ctx, "✅ 流已删除: %s", streamName) return nil } // GetConsumer 获取消费者信息 func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) { if !checkConnected() { return nil, fmt.Errorf("NATS 未连接") } consumer, err := js.Consumer(ctx, streamName, consumerName) if err != nil { return nil, fmt.Errorf("获取消费者失败: %w", err) } info, err := consumer.Info(ctx) if err != nil { return nil, fmt.Errorf("获取消费者信息失败: %w", err) } return info, nil } // ListConsumers 列出指定流的所有消费者(简化实现) // 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 func ListConsumers(ctx context.Context, streamName string) ([]string, error) { if !checkConnected() { return nil, fmt.Errorf("NATS 未连接") } // TODO: 根据实际 NATS 版本实现完整的消费者列表功能 return []string{}, nil } // DeleteConsumer 删除消费者 func DeleteConsumer(ctx context.Context, streamName, consumerName string) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } if err := js.DeleteConsumer(ctx, streamName, consumerName); err != nil { return fmt.Errorf("删除消费者失败: %w", err) } g.Log().Infof(ctx, "✅ 消费者已删除: %s/%s", streamName, consumerName) return nil } // CreateConsumer 创建消费者 func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) { if !checkConnected() { return nil, fmt.Errorf("NATS 未连接") } // 尝试获取现有消费者 consumer, err := js.Consumer(ctx, streamName, consumerName) if err == nil { return consumer, nil } // 创建新消费者 consumer, err = js.CreateConsumer(ctx, streamName, config) if err != nil { return nil, fmt.Errorf("创建消费者失败: %w", err) } return consumer, nil } // ============ RPC 服务封装 ============ // 以下方法提供了完全抽象的 RPC 调用接口 // 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式 // RegisterRPCService 注册 RPC 服务(单实例) // serviceName: 服务名称,调用方通过此名称调用服务 // handler: 服务处理函数,接收请求并返回响应 func RegisterRPCService(serviceName string, handler RPCHandler) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } rpcServicesMu.Lock() if rpcServices == nil { rpcServices = make(map[string]RPCHandler) } if rpcSubs == nil { rpcSubs = make(map[string]*nats.Subscription) } // 如果已存在该服务,先取消之前的订阅 if oldSub, exists := rpcSubs[serviceName]; exists { oldSub.Unsubscribe() } rpcServices[serviceName] = handler rpcServicesMu.Unlock() // 订阅服务主题 subject := fmt.Sprintf("rpc.%s", serviceName) sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { ctx := context.Background() response, err := handler(ctx, msg.Data) if err != nil { errMsg := fmt.Sprintf("处理失败: %v", err) if err = msg.Respond([]byte(errMsg)); err != nil { g.Log().Errorf(ctx, "RPC 错误响应失败: %v", err) } return } if err = msg.Respond(response); err != nil { g.Log().Errorf(ctx, "RPC 响应失败: %v", err) } }) if err != nil { return fmt.Errorf("注册 RPC 服务失败: %w", err) } rpcSubs[serviceName] = sub metrics.SubscribeCount.Add(1) g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName) return nil } // RegisterQueueRPCService 注册 RPC 服务(集群模式) // 多个服务实例注册同一服务时,请求会自动负载均衡 // serviceName: 服务名称 // queueName: 队列组名,同一队列组的实例共享请求 // handler: 服务处理函数 func RegisterQueueRPCService(serviceName, queueName string, handler RPCHandler) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } queueRPCMu.Lock() if queueRPCServices == nil { queueRPCServices = make(map[string]map[string]RPCHandler) } if queueRPCSubs == nil { queueRPCSubs = make(map[string]map[string]*nats.Subscription) } if queueRPCServices[queueName] == nil { queueRPCServices[queueName] = make(map[string]RPCHandler) } if queueRPCSubs[queueName] == nil { queueRPCSubs[queueName] = make(map[string]*nats.Subscription) } // 如果已存在该服务,先取消之前的订阅 if oldSub, exists := queueRPCSubs[queueName][serviceName]; exists { oldSub.Unsubscribe() } queueRPCServices[queueName][serviceName] = handler queueRPCMu.Unlock() // 订阅服务主题(队列模式) subject := fmt.Sprintf("rpc.%s", serviceName) sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) { ctx := context.Background() response, err := handler(ctx, msg.Data) if err != nil { errMsg := fmt.Sprintf("处理失败: %v", err) if err = msg.Respond([]byte(errMsg)); err != nil { g.Log().Errorf(ctx, "RPC 错误响应失败: %v", err) } return } if err = msg.Respond(response); err != nil { g.Log().Errorf(ctx, "RPC 响应失败: %v", err) } }) if err != nil { return fmt.Errorf("注册队列 RPC 服务失败: %w", err) } queueRPCMu.Lock() queueRPCSubs[queueName][serviceName] = sub queueRPCMu.Unlock() metrics.SubscribeCount.Add(1) g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName) return nil } // CallRPC 调用 RPC 服务 // serviceName: 服务名称 // req: 请求数据 // timeout: 超时时间 // 返回: 响应数据和错误 func CallRPC(ctx context.Context, serviceName string, req []byte, timeout time.Duration) ([]byte, error) { if !checkConnected() { return nil, fmt.Errorf("NATS 未连接") } metrics.RequestCount.Add(1) // 检查本地是否有注册的单实例服务,如果有则直接调用(优化性能) rpcServicesMu.RLock() if localHandler, exists := rpcServices[serviceName]; exists { rpcServicesMu.RUnlock() // 本地直接调用,避免网络开销 response, err := localHandler(ctx, req) if err != nil { metrics.RequestError.Add(1) return nil, fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err) } return response, nil } rpcServicesMu.RUnlock() // 通过 NATS 网络调用远程服务 timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() subject := fmt.Sprintf("rpc.%s", serviceName) msg, err := nc.RequestWithContext(timeoutCtx, subject, req) if err != nil { metrics.RequestError.Add(1) return nil, fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err) } if msg == nil { metrics.RequestError.Add(1) return nil, fmt.Errorf("RPC 响应为空 [%s]", serviceName) } return msg.Data, nil } // RegisterServiceOption 注册选项类型 type RegisterServiceOption func(*registerServiceConfig) type registerServiceConfig struct { queueName string // 队列组名(用于集群模式) excludeMethods []string } // WithQueueGroup 设置队列组名(集群模式) func WithQueueGroup(queueName string) RegisterServiceOption { return func(cfg *registerServiceConfig) { cfg.queueName = queueName } } // WithExcludeMethods 排除不需要注册的方法 func WithExcludeMethods(methods ...string) RegisterServiceOption { return func(cfg *registerServiceConfig) { cfg.excludeMethods = append(cfg.excludeMethods, methods...) } } // registerService 注册单个服务的所有公开方法(内部函数) func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) error { if !checkConnected() { return fmt.Errorf("NATS 未连接") } // 应用选项 cfg := ®isterServiceConfig{} for _, opt := range options { opt(cfg) } // 创建排除方法集合 excludeSet := make(map[string]struct{}) for _, method := range cfg.excludeMethods { excludeSet[method] = struct{}{} } // 获取 service 的类型 serviceType := reflect.TypeOf(service) // 遍历所有方法 registeredCount := 0 for i := 0; i < serviceType.NumMethod(); i++ { method := serviceType.Method(i) // 只注册导出方法(首字母大写) if !method.IsExported() { continue } // 排除指定的方法 if _, exists := excludeSet[method.Name]; exists { continue } // 检查方法签名:必须是 func(ctx context.Context, request) (response, error) if method.Type.NumIn() < 2 { g.Log().Warningf(context.Background(), "方法 %s 的参数数量不足,跳过注册", method.Name) continue } // 第一个参数必须是 context.Context if !method.Type.In(0).Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { g.Log().Warningf(context.Background(), "方法 %s 的第一个参数必须是 context.Context,跳过注册", method.Name) continue } // 返回值必须是 (result, error) 或 error if method.Type.NumOut() < 1 || method.Type.NumOut() > 2 { g.Log().Warningf(context.Background(), "方法 %s 的返回值数量不正确,跳过注册", method.Name) continue } if !method.Type.Out(method.Type.NumOut() - 1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { g.Log().Warningf(context.Background(), "方法 %s 的最后一个返回值必须是 error,跳过注册", method.Name) continue } // 生成服务名称:前缀.方法名(保持原始方法名) serviceName := fmt.Sprintf("%s.%s", serviceNamePrefix, method.Name) // 创建 RPC handler handler := func(ctx context.Context, req []byte) ([]byte, error) { // 准备方法调用参数 args := make([]reflect.Value, 2) args[0] = reflect.ValueOf(ctx) // 解析请求参数 if len(req) > 0 { // 如果方法有第二个参数,尝试解析 JSON if method.Type.NumIn() > 1 { reqValuePtr := reflect.New(method.Type.In(1)) if err := json.Unmarshal(req, reqValuePtr.Interface()); err != nil { return nil, fmt.Errorf("解析请求参数失败: %w", err) } args[1] = reqValuePtr.Elem() } } else if method.Type.NumIn() > 1 { // 如果方法需要参数但请求为空,创建零值 args[1] = reflect.Zero(method.Type.In(1)) } // 调用方法 results := method.Func.Call(args) // 处理返回值 var err error var result interface{} if len(results) == 1 { // 只有 error if !results[0].IsNil() { err = results[0].Interface().(error) } } else if len(results) == 2 { // (result, error) result = results[0].Interface() if !results[1].IsNil() { err = results[1].Interface().(error) } } if err != nil { return nil, err } // 序列化返回值 if result == nil || (reflect.ValueOf(result).Kind() == reflect.Ptr && reflect.ValueOf(result).IsNil()) { return []byte("{}"), nil } return json.Marshal(result) } // 注册 RPC 服务 var err error if cfg.queueName != "" { err = RegisterQueueRPCService(serviceName, cfg.queueName, handler) } else { err = RegisterRPCService(serviceName, handler) } if err != nil { g.Log().Errorf(context.Background(), "注册服务 %s 失败: %v", serviceName, err) continue } registeredCount++ g.Log().Infof(context.Background(), "✅ 已自动注册 RPC 服务: %s -> %s", serviceName, method.Name) } if registeredCount == 0 { g.Log().Warningf(context.Background(), "未注册任何方法,请检查 %v 的方法签名", serviceNamePrefix) return fmt.Errorf("未找到可注册的方法") } g.Log().Infof(context.Background(), "✅ Service %v 共注册了 %d 个 RPC 方法", serviceNamePrefix, registeredCount) return nil } // AutoRegisterServices 自动注册多个服务的所有公开方法 // serviceInstances: map[包名]service实例,如 map[string]interface{}{"user": userService, "order": orderService} // options: 注册选项(可选) // 示例: // // AutoRegisterServices(map[string]interface{}{ // "user": userService, // "order": orderService, // }) // 或 // AutoRegisterServices(map[string]interface{}{ // "order": orderService, // }, WithQueueGroup("order-group")) func AutoRegisterServices(serviceInstances map[string]interface{}, options ...RegisterServiceOption) error { if len(serviceInstances) == 0 { return fmt.Errorf("service 实例列表不能为空") } totalRegistered := 0 // 遍历每个 service 实例 for pkgName, serviceInstance := range serviceInstances { // 注册服务 err := registerService(serviceInstance, pkgName, options...) if err != nil { g.Log().Errorf(context.Background(), "注册 %s 服务失败: %v", pkgName, err) continue } totalRegistered++ g.Log().Infof(context.Background(), "✅ %s 服务已自动注册", pkgName) } if totalRegistered == 0 { return fmt.Errorf("未能注册任何服务") } g.Log().Infof(context.Background(), "✅ 共自动注册了 %d 个服务", totalRegistered) return nil }