2026-04-03 17:52:09 +08:00
|
|
|
|
package service
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"customer-server/consts/account"
|
|
|
|
|
|
"customer-server/model/dto"
|
2026-04-11 18:22:52 +08:00
|
|
|
|
"fmt"
|
|
|
|
|
|
netHttp "net/http"
|
2026-04-03 17:52:09 +08:00
|
|
|
|
|
2026-06-10 15:51:41 +08:00
|
|
|
|
"gitea.redpowerfuture.com/red-future/common/beans"
|
|
|
|
|
|
"gitea.redpowerfuture.com/red-future/common/jaeger"
|
2026-04-03 17:52:09 +08:00
|
|
|
|
"github.com/gogf/gf/v2/container/gmap"
|
|
|
|
|
|
"github.com/gogf/gf/v2/encoding/gjson"
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
|
"github.com/gogf/gf/v2/net/ghttp"
|
|
|
|
|
|
"github.com/gogf/gf/v2/os/glog"
|
2026-04-11 18:22:52 +08:00
|
|
|
|
"github.com/gogf/gf/v2/os/grpool"
|
2026-04-03 17:52:09 +08:00
|
|
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// AccountWebSocket 全局单例
|
|
|
|
|
|
var AccountWebSocket = &accountWebsocketService{
|
|
|
|
|
|
connections: gmap.NewStrAnyMap(true),
|
|
|
|
|
|
upgrader: websocket.Upgrader{
|
2026-04-11 18:22:52 +08:00
|
|
|
|
CheckOrigin: func(r *netHttp.Request) bool {
|
2026-04-03 17:52:09 +08:00
|
|
|
|
return true // 允许跨域
|
|
|
|
|
|
},
|
|
|
|
|
|
},
|
2026-04-11 18:22:52 +08:00
|
|
|
|
workerPool: grpool.New(50), // 限制最大并发数为50
|
2026-04-03 17:52:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type accountWebsocketService struct {
|
|
|
|
|
|
connections *gmap.StrAnyMap
|
|
|
|
|
|
upgrader websocket.Upgrader
|
2026-04-11 18:22:52 +08:00
|
|
|
|
workerPool *grpool.Pool // 工作池限制goroutine数量
|
2026-04-03 17:52:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// key: userId_platform
|
|
|
|
|
|
// accountWsConnection WebSocket 连接信息
|
|
|
|
|
|
type accountWsConnection struct {
|
2026-04-11 18:22:52 +08:00
|
|
|
|
AccountInfo *dto.AccountVO
|
2026-04-03 17:52:09 +08:00
|
|
|
|
UserId string
|
|
|
|
|
|
Conn *websocket.Conn
|
2026-04-11 18:22:52 +08:00
|
|
|
|
Headers map[string]string // 保存原始请求头
|
2026-04-03 17:52:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Connect 建立 WebSocket 连接
|
2026-04-11 18:22:52 +08:00
|
|
|
|
func (s *accountWebsocketService) Connect(ctx context.Context, r *ghttp.Request, req *dto.AccountWebSocketConnectReq) error {
|
2026-04-03 17:52:09 +08:00
|
|
|
|
// 使用原生upgrader升级WebSocket连接
|
|
|
|
|
|
ws, err := s.upgrader.Upgrade(r.Response.Writer, r.Request, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
jaeger.RecordError(ctx, err, "WebSocket 升级失败")
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2026-04-11 18:22:52 +08:00
|
|
|
|
|
|
|
|
|
|
// 获取客服账号信息
|
|
|
|
|
|
accountInfo, err := SessionToolService.GetAccountInfo(ctx, req.AccountCode)
|
2026-04-03 17:52:09 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
2026-04-11 18:22:52 +08:00
|
|
|
|
if g.IsEmpty(accountInfo) {
|
|
|
|
|
|
return fmt.Errorf("客服账号不存在")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 创建完整的用户信息
|
|
|
|
|
|
userInfo := &beans.User{
|
|
|
|
|
|
UserName: accountInfo.Creator,
|
|
|
|
|
|
TenantId: accountInfo.TenantId,
|
|
|
|
|
|
}
|
|
|
|
|
|
ctx = context.WithValue(ctx, "user", *userInfo)
|
|
|
|
|
|
// 提取并保存请求头(在连接升级前)
|
|
|
|
|
|
headers := make(map[string]string)
|
|
|
|
|
|
// 提取其他headers
|
|
|
|
|
|
for k, v := range r.Request.Header {
|
|
|
|
|
|
if len(v) > 0 {
|
|
|
|
|
|
headers[k] = v[0]
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
// 将完整用户信息序列化为JSON,放到X-User-Info请求头
|
|
|
|
|
|
userInfoJson, err := gjson.Encode(userInfo)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
glog.Errorf(ctx, "用户信息序列化失败: %v", err)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
headers["X-User-Info"] = string(userInfoJson)
|
|
|
|
|
|
glog.Debugf(ctx, "已添加用户信息到请求头: %s", string(userInfoJson))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var key = fmt.Sprintf("account:%s:%s:%s", req.AccountCode, account.GetDescByCode(req.Platform), req.UserId)
|
|
|
|
|
|
content, err := SessionToolService.PushOpeningRemark(ctx, req.UserId, accountInfo, headers)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
2026-04-03 17:52:09 +08:00
|
|
|
|
}
|
2026-04-11 18:22:52 +08:00
|
|
|
|
if !g.IsEmpty(content) {
|
2026-04-03 17:52:09 +08:00
|
|
|
|
s.writeJSON(ws, &dto.WebSocketPushMsg{
|
|
|
|
|
|
Type: "message",
|
2026-04-11 18:22:52 +08:00
|
|
|
|
Message: content,
|
2026-04-03 17:52:09 +08:00
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭旧连接
|
|
|
|
|
|
if old := s.connections.Get(key); old != nil {
|
|
|
|
|
|
old.(*accountWsConnection).Conn.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-11 18:22:52 +08:00
|
|
|
|
// 注册新连接(携带完整用户信息)
|
2026-04-03 17:52:09 +08:00
|
|
|
|
s.connections.Set(key, &accountWsConnection{
|
2026-04-11 18:22:52 +08:00
|
|
|
|
AccountInfo: accountInfo,
|
|
|
|
|
|
UserId: req.UserId,
|
2026-04-03 17:52:09 +08:00
|
|
|
|
Conn: ws,
|
2026-04-11 18:22:52 +08:00
|
|
|
|
Headers: headers, // 保存请求头
|
2026-04-03 17:52:09 +08:00
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 处理消息(阻塞)
|
|
|
|
|
|
s.handleConnection(ctx, key, ws)
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// handleConnection 处理 WebSocket 连接
|
|
|
|
|
|
func (s *accountWebsocketService) handleConnection(ctx context.Context, key string, conn *websocket.Conn) {
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
s.connections.Remove(key)
|
|
|
|
|
|
conn.Close()
|
|
|
|
|
|
glog.Infof(ctx, "WebSocket 连接断开 - %s", key)
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
msgType, message, err := conn.ReadMessage()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
// 排除正常关闭情况:正常关闭、离开页面、无状态码关闭
|
|
|
|
|
|
if websocket.IsUnexpectedCloseError(err,
|
|
|
|
|
|
websocket.CloseNormalClosure,
|
|
|
|
|
|
websocket.CloseGoingAway,
|
|
|
|
|
|
websocket.CloseNoStatusReceived,
|
|
|
|
|
|
) {
|
|
|
|
|
|
jaeger.RecordError(ctx, err, "WebSocket 读取错误")
|
|
|
|
|
|
}
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if msgType != websocket.TextMessage {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-11 18:22:52 +08:00
|
|
|
|
questionContent := gconv.String(message)
|
|
|
|
|
|
glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, questionContent)
|
|
|
|
|
|
|
|
|
|
|
|
// 解析连接信息
|
|
|
|
|
|
connInfo := s.connections.Get(key)
|
|
|
|
|
|
if connInfo == nil {
|
|
|
|
|
|
glog.Warningf(ctx, "WebSocket连接信息不存在 - %s", key)
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
wsConn := connInfo.(*accountWsConnection)
|
|
|
|
|
|
|
|
|
|
|
|
// 发送ack告知用户正在处理
|
|
|
|
|
|
s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."})
|
|
|
|
|
|
|
|
|
|
|
|
// 异步处理消息,避免阻塞WebSocket连接,使用工作池限制并发
|
|
|
|
|
|
s.workerPool.Add(ctx, func(poolCtx context.Context) {
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
|
glog.Errorf(ctx, "WebSocket处理消息失败: %v", r)
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
var content string
|
|
|
|
|
|
content, err = SessionToolService.PushDialog(ctx, wsConn.UserId, questionContent, wsConn.AccountInfo, wsConn.Headers)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
s.writeJSON(conn, &dto.WebSocketPushMsg{
|
|
|
|
|
|
Type: "error",
|
|
|
|
|
|
Content: err.Error(),
|
|
|
|
|
|
})
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
// 发送答案给前端
|
|
|
|
|
|
s.writeJSON(conn, &dto.WebSocketPushMsg{
|
|
|
|
|
|
Type: "answer",
|
|
|
|
|
|
Content: content,
|
|
|
|
|
|
})
|
|
|
|
|
|
})
|
2026-04-03 17:52:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-11 18:22:52 +08:00
|
|
|
|
// writeJSON 发送 JSON 消息(带错误处理)
|
2026-04-03 17:52:09 +08:00
|
|
|
|
func (s *accountWebsocketService) writeJSON(conn *websocket.Conn, data interface{}) {
|
2026-04-11 18:22:52 +08:00
|
|
|
|
jsonBytes, err := gjson.Encode(data)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
glog.Errorf(context.Background(), "JSON编码失败: %v", err)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
if err := conn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
|
|
|
|
|
|
glog.Errorf(context.Background(), "WebSocket写入失败: %v", err)
|
|
|
|
|
|
}
|
2026-04-03 17:52:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-11 18:22:52 +08:00
|
|
|
|
func (s *accountWebsocketService) AccountMsg(ctx context.Context, msg any) (err error) {
|
|
|
|
|
|
msgStr := gconv.Map(msg)
|
|
|
|
|
|
if g.IsEmpty(msgStr) {
|
|
|
|
|
|
g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty")
|
2026-04-03 17:52:09 +08:00
|
|
|
|
return
|
|
|
|
|
|
}
|
2026-04-11 18:22:52 +08:00
|
|
|
|
// 直接通过 key 获取连接
|
|
|
|
|
|
connAny := s.connections.Get(gconv.String(msgStr["key"]))
|
|
|
|
|
|
if connAny != nil {
|
|
|
|
|
|
wsConn := connAny.(*accountWsConnection)
|
|
|
|
|
|
s.writeJSON(wsConn.Conn, &dto.WebSocketPushMsg{
|
|
|
|
|
|
Type: "delay_msg",
|
|
|
|
|
|
Content: gconv.String(msgStr["data"]),
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
g.Log().Info(ctx, "DocsChunkMsg:", msgStr)
|
2026-04-03 17:52:09 +08:00
|
|
|
|
return
|
|
|
|
|
|
}
|
2026-04-11 18:22:52 +08:00
|
|
|
|
|
|
|
|
|
|
// Close 释放所有资源
|
|
|
|
|
|
func (s *accountWebsocketService) Close() {
|
|
|
|
|
|
if s.workerPool != nil {
|
|
|
|
|
|
s.workerPool.Close()
|
|
|
|
|
|
glog.Info(context.Background(), "WebSocket工作池已关闭")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭所有WebSocket连接
|
|
|
|
|
|
s.connections.LockFunc(func(m map[string]interface{}) {
|
|
|
|
|
|
for key, conn := range m {
|
|
|
|
|
|
if wsConn, ok := conn.(*accountWsConnection); ok && wsConn.Conn != nil {
|
|
|
|
|
|
wsConn.Conn.Close()
|
|
|
|
|
|
glog.Infof(context.Background(), "强制关闭WebSocket连接 - %s", key)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
})
|
|
|
|
|
|
s.connections.Clear()
|
|
|
|
|
|
glog.Info(context.Background(), "WebSocket连接池已清空")
|
|
|
|
|
|
}
|