feat: 新增账号编码和HTTP连接功能

This commit is contained in:
2026-04-11 18:22:52 +08:00
parent 2f5c4f7e54
commit f8927afa9c
94 changed files with 1213 additions and 10230 deletions

View File

@@ -3,19 +3,18 @@ package service
import (
"context"
"customer-server/consts/account"
"customer-server/dao"
"customer-server/model/dto"
"customer-server/model/entity"
"errors"
"net/http"
"fmt"
netHttp "net/http"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/jaeger"
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/util/gconv"
"github.com/gorilla/websocket"
)
@@ -24,73 +23,92 @@ import (
var AccountWebSocket = &accountWebsocketService{
connections: gmap.NewStrAnyMap(true),
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
CheckOrigin: func(r *netHttp.Request) bool {
return true // 允许跨域
},
},
workerPool: grpool.New(50), // 限制最大并发数为50
}
type accountWebsocketService struct {
connections *gmap.StrAnyMap
upgrader websocket.Upgrader
workerPool *grpool.Pool // 工作池限制goroutine数量
}
// key: userId_platform
// accountWsConnection WebSocket 连接信息
type accountWsConnection struct {
AccountInfo *dto.AccountVO
UserId string
Platform account.Platform
TenantId uint64
AccountName string // 客服账号ID
Conn *websocket.Conn
CreatedAt int64
Headers map[string]string // 保存原始请求头
}
// Connect 建立 WebSocket 连接
func (s *accountWebsocketService) Connect(ctx context.Context, r *ghttp.Request, accountName string, platform account.Platform) error {
func (s *accountWebsocketService) Connect(ctx context.Context, r *ghttp.Request, req *dto.AccountWebSocketConnectReq) error {
// 使用原生upgrader升级WebSocket连接
ws, err := s.upgrader.Upgrade(r.Response.Writer, r.Request, nil)
if err != nil {
jaeger.RecordError(ctx, err, "WebSocket 升级失败")
return err
}
defer ws.Close()
if g.IsEmpty(accountName) {
return errors.New("accountName is empty")
}
res, err := s.getGreeting(ctx, accountName)
// 获取客服账号信息
accountInfo, err := SessionToolService.GetAccountInfo(ctx, req.AccountCode)
if err != nil {
return err
}
if g.IsEmpty(&res) {
return errors.New("account is empty")
}
if !g.IsEmpty(res.Greeting) {
s.writeJSON(ws, &dto.WebSocketPushMsg{
Type: "message",
Message: res.Greeting,
})
glog.Infof(ctx, "已发送开场白 - 用户: %v, 客服账号: %s, 长度: %d", res.Id, accountName, len(res.Greeting))
} else {
glog.Warningf(ctx, "客服账号未配置开场白 - accountName: %s, tenantId: %v", accountName, res.TenantId)
if g.IsEmpty(accountInfo) {
return fmt.Errorf("客服账号不存在")
}
// key格式: tenantId:userId_platform (确保租户隔离)
key := gconv.String(res.TenantId) + ":" + gconv.String(res.Creator) + ":" + gconv.String(platform)
// 创建完整的用户信息
userInfo := &beans.User{
UserName: accountInfo.Creator,
TenantId: accountInfo.TenantId,
}
ctx = context.WithValue(ctx, "user", *userInfo)
// 提取并保存请求头(在连接升级前)
headers := make(map[string]string)
// 提取其他headers
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
// 将完整用户信息序列化为JSON放到X-User-Info请求头
userInfoJson, err := gjson.Encode(userInfo)
if err != nil {
glog.Errorf(ctx, "用户信息序列化失败: %v", err)
} else {
headers["X-User-Info"] = string(userInfoJson)
glog.Debugf(ctx, "已添加用户信息到请求头: %s", string(userInfoJson))
}
var key = fmt.Sprintf("account:%s:%s:%s", req.AccountCode, account.GetDescByCode(req.Platform), req.UserId)
content, err := SessionToolService.PushOpeningRemark(ctx, req.UserId, accountInfo, headers)
if err != nil {
return err
}
if !g.IsEmpty(content) {
s.writeJSON(ws, &dto.WebSocketPushMsg{
Type: "message",
Message: content,
})
}
// 关闭旧连接
if old := s.connections.Get(key); old != nil {
old.(*accountWsConnection).Conn.Close()
}
// 注册新连接(携带 TenantId 和 AccountName
// 注册新连接(携带完整用户信息
s.connections.Set(key, &accountWsConnection{
UserId: res.Creator,
Platform: platform,
TenantId: res.TenantId,
AccountName: accountName,
AccountInfo: accountInfo,
UserId: req.UserId,
Conn: ws,
CreatedAt: gtime.Now().Timestamp(),
Headers: headers, // 保存请求头
})
// 处理消息(阻塞)
@@ -124,64 +142,93 @@ func (s *accountWebsocketService) handleConnection(ctx context.Context, key stri
continue
}
content := gconv.String(message)
glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, content)
questionContent := gconv.String(message)
glog.Infof(ctx, "收到 WebSocket 消息 - %s: %s", key, questionContent)
// 解析 userId
//connInfo := s.connections.Get(key)
//if connInfo == nil {
// break
//}
//wsConn := connInfo.(*accountWsConnection)
// 解析连接信息
connInfo := s.connections.Get(key)
if connInfo == nil {
glog.Warningf(ctx, "WebSocket连接信息不存在 - %s", key)
break
}
wsConn := connInfo.(*accountWsConnection)
// 先检查对话轮数,>5 则只发卡片,跳过话术
//checkCardBeforeProcess 已推送卡片消息无需ack
//if handled, err := checkCardBeforeProcess(ctx, wsConn.TenantId, wsConn.UserId, wsConn.Platform); err != nil {
// jaeger.RecordError(ctx, err, "卡片检查失败")
//} else if handled {
// continue
//}
// 发送ack告知用户正在处理
s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."})
// 话术匹配并发布响应
// status 暂时为空,表示任意行为匹配
// isPushed=true表示已直接推送响应话术匹配无需ack
// isPushed=false表示转发到RAGFlow需要ack告知用户正在处理
// 异步处理消息避免阻塞WebSocket连接使用工作池限制并发
s.workerPool.Add(ctx, func(poolCtx context.Context) {
defer func() {
if r := recover(); r != nil {
glog.Errorf(ctx, "WebSocket处理消息失败: %v", r)
}
}()
// 创建带有accountName的context供GetTenantInfo使用
//newCtx := ctx
//if wsConn.AccountName != "" {
// newCtx = context.WithValue(ctx, "accountName", wsConn.AccountName)
//}
//isPushed, err := Speechcraft.ProcessAndPublish(newCtx, wsConn.UserId, wsConn.Platform, wsConn.TenantId, content, "", wsConn.AccountName)
//if err != nil {
// jaeger.RecordError(ctx, err, "话术处理失败")
// s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "error", Message: "消息处理失败"})
// continue
//}
// 只在转发到RAGFlow时发送ackGo直接返回的不需要ack
//if !isPushed {
// s.writeJSON(conn, &dto.WebSocketPushMsg{Type: "ack", Message: "消息已接收,正在处理..."})
//}
var content string
content, err = SessionToolService.PushDialog(ctx, wsConn.UserId, questionContent, wsConn.AccountInfo, wsConn.Headers)
if err != nil {
s.writeJSON(conn, &dto.WebSocketPushMsg{
Type: "error",
Content: err.Error(),
})
return
}
// 发送答案给前端
s.writeJSON(conn, &dto.WebSocketPushMsg{
Type: "answer",
Content: content,
})
})
}
}
// writeJSON 发送 JSON 消息
// writeJSON 发送 JSON 消息(带错误处理)
func (s *accountWebsocketService) writeJSON(conn *websocket.Conn, data interface{}) {
jsonBytes, _ := gjson.Encode(data)
conn.WriteMessage(websocket.TextMessage, jsonBytes)
}
// getGreeting 获取客服账号的开场白
func (s *accountWebsocketService) getGreeting(ctx context.Context, accountName string) (res *entity.Account, err error) {
res, err = dao.Account.GetByAccountName(ctx, &dto.GetByAccountNameReq{
AccountName: accountName,
})
jsonBytes, err := gjson.Encode(data)
if err != nil {
jaeger.RecordError(ctx, err, "查询客服账号开场白失败")
glog.Errorf(ctx, "查询开场白失败: %v", err)
glog.Errorf(context.Background(), "JSON编码失败: %v", err)
return
}
if err := conn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
glog.Errorf(context.Background(), "WebSocket写入失败: %v", err)
}
}
func (s *accountWebsocketService) AccountMsg(ctx context.Context, msg any) (err error) {
msgStr := gconv.Map(msg)
if g.IsEmpty(msgStr) {
g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty")
return
}
// 直接通过 key 获取连接
connAny := s.connections.Get(gconv.String(msgStr["key"]))
if connAny != nil {
wsConn := connAny.(*accountWsConnection)
s.writeJSON(wsConn.Conn, &dto.WebSocketPushMsg{
Type: "delay_msg",
Content: gconv.String(msgStr["data"]),
})
}
g.Log().Info(ctx, "DocsChunkMsg:", msgStr)
return
}
// Close 释放所有资源
func (s *accountWebsocketService) Close() {
if s.workerPool != nil {
s.workerPool.Close()
glog.Info(context.Background(), "WebSocket工作池已关闭")
}
// 关闭所有WebSocket连接
s.connections.LockFunc(func(m map[string]interface{}) {
for key, conn := range m {
if wsConn, ok := conn.(*accountWsConnection); ok && wsConn.Conn != nil {
wsConn.Conn.Close()
glog.Infof(context.Background(), "强制关闭WebSocket连接 - %s", key)
}
}
})
s.connections.Clear()
glog.Info(context.Background(), "WebSocket连接池已清空")
}