package service import ( "context" "customer-server/consts/account" "customer-server/dao" "customer-server/model/dto" "customer-server/model/entity" "errors" "net/http" "gitea.com/red-future/common/jaeger" "github.com/gogf/gf/v2/container/gmap" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "github.com/gorilla/websocket" ) // AccountWebSocket 全局单例 var AccountWebSocket = &accountWebsocketService{ connections: gmap.NewStrAnyMap(true), upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // 允许跨域 }, }, } type accountWebsocketService struct { connections *gmap.StrAnyMap upgrader websocket.Upgrader } // key: userId_platform // accountWsConnection WebSocket 连接信息 type accountWsConnection struct { UserId string Platform account.Platform TenantId uint64 AccountName string // 客服账号ID Conn *websocket.Conn CreatedAt int64 } // Connect 建立 WebSocket 连接 func (s *accountWebsocketService) Connect(ctx context.Context, r *ghttp.Request, accountName string, platform account.Platform) error { // 使用原生upgrader升级WebSocket连接 ws, err := s.upgrader.Upgrade(r.Response.Writer, r.Request, nil) if err != nil { jaeger.RecordError(ctx, err, "WebSocket 升级失败") return err } defer ws.Close() if g.IsEmpty(accountName) { return errors.New("accountName is empty") } res, err := s.getGreeting(ctx, accountName) if err != nil { return err } if g.IsEmpty(&res) { return errors.New("account is empty") } if !g.IsEmpty(res.Greeting) { s.writeJSON(ws, &dto.WebSocketPushMsg{ Type: "message", Message: res.Greeting, }) glog.Infof(ctx, "已发送开场白 - 用户: %v, 客服账号: %s, 长度: %d", res.Id, accountName, len(res.Greeting)) } else { glog.Warningf(ctx, "客服账号未配置开场白 - accountName: %s, tenantId: %v", accountName, res.TenantId) } // key格式: tenantId:userId_platform (确保租户隔离) key := gconv.String(res.TenantId) + ":" + gconv.String(res.Creator) + ":" + gconv.String(platform) // 关闭旧连接 if old := s.connections.Get(key); old != nil { old.(*accountWsConnection).Conn.Close() } // 注册新连接(携带 TenantId 和 AccountName) s.connections.Set(key, &accountWsConnection{ UserId: res.Creator, Platform: platform, TenantId: res.TenantId, AccountName: accountName, Conn: ws, CreatedAt: gtime.Now().Timestamp(), }) // 处理消息(阻塞) s.handleConnection(ctx, key, ws) return nil } // handleConnection 处理 WebSocket 连接 func (s *accountWebsocketService) handleConnection(ctx context.Context, key string, conn *websocket.Conn) { defer func() { s.connections.Remove(key) conn.Close() glog.Infof(ctx, "WebSocket 连接断开 - %s", key) }() for { msgType, message, err := conn.ReadMessage() if err != nil { // 排除正常关闭情况:正常关闭、离开页面、无状态码关闭 if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, ) { jaeger.RecordError(ctx, err, "WebSocket 读取错误") } break } if msgType != websocket.TextMessage { continue } content := gconv.String(message) glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, content) // 解析 userId connInfo := s.connections.Get(key) if connInfo == nil { break } wsConn := connInfo.(*accountWsConnection) // 先检查对话轮数,>5 则只发卡片,跳过话术 //checkCardBeforeProcess 已推送卡片消息,无需ack //if handled, err := checkCardBeforeProcess(ctx, wsConn.TenantId, wsConn.UserId, wsConn.Platform); err != nil { // jaeger.RecordError(ctx, err, "卡片检查失败") //} else if handled { // continue //} // 话术匹配并发布响应 // status 暂时为空,表示任意行为匹配 // isPushed=true表示已直接推送响应(话术匹配),无需ack // isPushed=false表示转发到RAGFlow,需要ack告知用户正在处理 // 创建带有accountName的context,供GetTenantInfo使用 //newCtx := ctx //if wsConn.AccountName != "" { // newCtx = context.WithValue(ctx, "accountName", wsConn.AccountName) //} //isPushed, err := Speechcraft.ProcessAndPublish(newCtx, wsConn.UserId, wsConn.Platform, wsConn.TenantId, content, "", wsConn.AccountName) //if err != nil { // jaeger.RecordError(ctx, err, "话术处理失败") // s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "error", Message: "消息处理失败"}) // continue //} // 只在转发到RAGFlow时发送ack(Go直接返回的不需要ack) //if !isPushed { // s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."}) //} } } // writeJSON 发送 JSON 消息 func (s *accountWebsocketService) writeJSON(conn *websocket.Conn, data interface{}) { jsonBytes, _ := gjson.Encode(data) conn.WriteMessage(websocket.TextMessage, jsonBytes) } // getGreeting 获取客服账号的开场白 func (s *accountWebsocketService) getGreeting(ctx context.Context, accountName string) (res *entity.Account, err error) { res, err = dao.Account.GetByAccountName(ctx, &dto.GetByAccountNameReq{ AccountName: accountName, }) if err != nil { jaeger.RecordError(ctx, err, "查询客服账号开场白失败") glog.Errorf(ctx, "查询开场白失败: %v", err) return } return }