package service import ( "context" "customer-server/consts/account" "customer-server/model/dto" "fmt" netHttp "net/http" "gitea.redpowerfuture.com/red-future/common/beans" "gitea.redpowerfuture.com/red-future/common/jaeger" "github.com/gogf/gf/v2/container/gmap" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/util/gconv" "github.com/gorilla/websocket" ) // AccountWebSocket 全局单例 var AccountWebSocket = &accountWebsocketService{ connections: gmap.NewStrAnyMap(true), upgrader: websocket.Upgrader{ CheckOrigin: func(r *netHttp.Request) bool { return true // 允许跨域 }, }, workerPool: grpool.New(50), // 限制最大并发数为50 } type accountWebsocketService struct { connections *gmap.StrAnyMap upgrader websocket.Upgrader workerPool *grpool.Pool // 工作池限制goroutine数量 } // key: userId_platform // accountWsConnection WebSocket 连接信息 type accountWsConnection struct { AccountInfo *dto.AccountVO UserId string Conn *websocket.Conn Headers map[string]string // 保存原始请求头 } // Connect 建立 WebSocket 连接 func (s *accountWebsocketService) Connect(ctx context.Context, r *ghttp.Request, req *dto.AccountWebSocketConnectReq) error { // 使用原生upgrader升级WebSocket连接 ws, err := s.upgrader.Upgrade(r.Response.Writer, r.Request, nil) if err != nil { jaeger.RecordError(ctx, err, "WebSocket 升级失败") return err } // 获取客服账号信息 accountInfo, err := SessionToolService.GetAccountInfo(ctx, req.AccountCode) if err != nil { return err } if g.IsEmpty(accountInfo) { return fmt.Errorf("客服账号不存在") } // 创建完整的用户信息 userInfo := &beans.User{ UserName: accountInfo.Creator, TenantId: accountInfo.TenantId, } ctx = context.WithValue(ctx, "user", *userInfo) // 提取并保存请求头(在连接升级前) headers := make(map[string]string) // 提取其他headers for k, v := range r.Request.Header { if len(v) > 0 { headers[k] = v[0] } } // 将完整用户信息序列化为JSON,放到X-User-Info请求头 userInfoJson, err := gjson.Encode(userInfo) if err != nil { glog.Errorf(ctx, "用户信息序列化失败: %v", err) } else { headers["X-User-Info"] = string(userInfoJson) glog.Debugf(ctx, "已添加用户信息到请求头: %s", string(userInfoJson)) } var key = fmt.Sprintf("account:%s:%s:%s", req.AccountCode, account.GetDescByCode(req.Platform), req.UserId) content, err := SessionToolService.PushOpeningRemark(ctx, req.UserId, accountInfo, headers) if err != nil { return err } if !g.IsEmpty(content) { s.writeJSON(ws, &dto.WebSocketPushMsg{ Type: "message", Message: content, }) } // 关闭旧连接 if old := s.connections.Get(key); old != nil { old.(*accountWsConnection).Conn.Close() } // 注册新连接(携带完整用户信息) s.connections.Set(key, &accountWsConnection{ AccountInfo: accountInfo, UserId: req.UserId, Conn: ws, Headers: headers, // 保存请求头 }) // 处理消息(阻塞) s.handleConnection(ctx, key, ws) return nil } // handleConnection 处理 WebSocket 连接 func (s *accountWebsocketService) handleConnection(ctx context.Context, key string, conn *websocket.Conn) { defer func() { s.connections.Remove(key) conn.Close() glog.Infof(ctx, "WebSocket 连接断开 - %s", key) }() for { msgType, message, err := conn.ReadMessage() if err != nil { // 排除正常关闭情况:正常关闭、离开页面、无状态码关闭 if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, ) { jaeger.RecordError(ctx, err, "WebSocket 读取错误") } break } if msgType != websocket.TextMessage { continue } questionContent := gconv.String(message) glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, questionContent) // 解析连接信息 connInfo := s.connections.Get(key) if connInfo == nil { glog.Warningf(ctx, "WebSocket连接信息不存在 - %s", key) break } wsConn := connInfo.(*accountWsConnection) // 发送ack告知用户正在处理 s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."}) // 异步处理消息,避免阻塞WebSocket连接,使用工作池限制并发 s.workerPool.Add(ctx, func(poolCtx context.Context) { defer func() { if r := recover(); r != nil { glog.Errorf(ctx, "WebSocket处理消息失败: %v", r) } }() var content string content, err = SessionToolService.PushDialog(ctx, wsConn.UserId, questionContent, wsConn.AccountInfo, wsConn.Headers) if err != nil { s.writeJSON(conn, &dto.WebSocketPushMsg{ Type: "error", Content: err.Error(), }) return } // 发送答案给前端 s.writeJSON(conn, &dto.WebSocketPushMsg{ Type: "answer", Content: content, }) }) } } // writeJSON 发送 JSON 消息(带错误处理) func (s *accountWebsocketService) writeJSON(conn *websocket.Conn, data interface{}) { jsonBytes, err := gjson.Encode(data) if err != nil { glog.Errorf(context.Background(), "JSON编码失败: %v", err) return } if err := conn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil { glog.Errorf(context.Background(), "WebSocket写入失败: %v", err) } } func (s *accountWebsocketService) AccountMsg(ctx context.Context, msg any) (err error) { msgStr := gconv.Map(msg) if g.IsEmpty(msgStr) { g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty") return } // 直接通过 key 获取连接 connAny := s.connections.Get(gconv.String(msgStr["key"])) if connAny != nil { wsConn := connAny.(*accountWsConnection) s.writeJSON(wsConn.Conn, &dto.WebSocketPushMsg{ Type: "delay_msg", Content: gconv.String(msgStr["data"]), }) } g.Log().Info(ctx, "DocsChunkMsg:", msgStr) return } // Close 释放所有资源 func (s *accountWebsocketService) Close() { if s.workerPool != nil { s.workerPool.Close() glog.Info(context.Background(), "WebSocket工作池已关闭") } // 关闭所有WebSocket连接 s.connections.LockFunc(func(m map[string]interface{}) { for key, conn := range m { if wsConn, ok := conn.(*accountWsConnection); ok && wsConn.Conn != nil { wsConn.Conn.Close() glog.Infof(context.Background(), "强制关闭WebSocket连接 - %s", key) } } }) s.connections.Clear() glog.Info(context.Background(), "WebSocket连接池已清空") }