460 lines
13 KiB
Go
460 lines
13 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"crypto/aes"
|
||
"crypto/cipher"
|
||
"crypto/rand"
|
||
"crypto/sha256"
|
||
"customer-server/dao"
|
||
"customer-server/model/dto"
|
||
"customer-server/model/entity"
|
||
"encoding/base64"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"strings"
|
||
|
||
commonMongo "gitea.com/red-future/common/db/mongo"
|
||
"gitea.com/red-future/common/jaeger"
|
||
"gitea.com/red-future/common/rabbitmq"
|
||
"gitea.com/red-future/common/redis"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/gtime"
|
||
"github.com/gogf/gf/v2/util/gconv"
|
||
"go.mongodb.org/mongo-driver/v2/bson"
|
||
)
|
||
|
||
var Xiaohongshu = new(xiaohongshu)
|
||
|
||
type xiaohongshu struct{}
|
||
|
||
const (
|
||
XhsApiBaseUrl = "https://adapi.xiaohongshu.com"
|
||
XhsPlatformName = "xiaohongshu"
|
||
XhsEncryptSplit = "~split~"
|
||
)
|
||
|
||
// ==================== 加解密工具 ====================
|
||
|
||
// Encrypt AES加密
|
||
// 参数: ctx - 上下文,content - 待加密内容,secretKey - 密钥(Base64编码)
|
||
// 返回: res - 加密后的字符串(Base64编码),err - 错误信息
|
||
// 功能: 使用AES-CBC模式加密内容,用于小红书API签名
|
||
func (s *xiaohongshu) Encrypt(ctx context.Context, content, secretKey string) (res string, err error) {
|
||
keyBytes, err := base64.StdEncoding.DecodeString(secretKey)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
block, err := aes.NewCipher(keyBytes)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
iv := make([]byte, aes.BlockSize)
|
||
if _, err = io.ReadFull(rand.Reader, iv); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
stream := cipher.NewCBCEncrypter(block, iv)
|
||
contentBytes := []byte(content)
|
||
paddedContent := pkcs5Padding(contentBytes, aes.BlockSize)
|
||
cipherText := make([]byte, len(paddedContent))
|
||
stream.CryptBlocks(cipherText, paddedContent)
|
||
|
||
ivBase64 := base64.StdEncoding.EncodeToString(iv)
|
||
cipherBase64 := base64.StdEncoding.EncodeToString(cipherText)
|
||
res = fmt.Sprintf("%s%s%s", ivBase64, XhsEncryptSplit, cipherBase64)
|
||
return
|
||
}
|
||
|
||
func (s *xiaohongshu) Decrypt(ctx context.Context, cipherText, secretKey string) (res string, err error) {
|
||
keyBytes, err := base64.StdEncoding.DecodeString(secretKey)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
parts := strings.Split(cipherText, XhsEncryptSplit)
|
||
if len(parts) != 2 {
|
||
err = errors.New("invalid cipher text format")
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
iv, err := base64.StdEncoding.DecodeString(parts[0])
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
encrypted, err := base64.StdEncoding.DecodeString(parts[1])
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
block, err := aes.NewCipher(keyBytes)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
if len(encrypted)%aes.BlockSize != 0 {
|
||
err = errors.New("cipher text is not a multiple of block size")
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
stream := cipher.NewCBCDecrypter(block, iv)
|
||
decrypted := make([]byte, len(encrypted))
|
||
stream.CryptBlocks(decrypted, encrypted)
|
||
|
||
decrypted, err = pkcs5Unpadding(decrypted)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
res = string(decrypted)
|
||
return
|
||
}
|
||
|
||
func pkcs5Padding(data []byte, blockSize int) []byte {
|
||
padding := blockSize - len(data)%blockSize
|
||
padText := make([]byte, padding)
|
||
for i := range padText {
|
||
padText[i] = byte(padding)
|
||
}
|
||
return append(data, padText...)
|
||
}
|
||
|
||
func pkcs5Unpadding(data []byte) (res []byte, err error) {
|
||
length := len(data)
|
||
if length == 0 {
|
||
err = errors.New("invalid padding size")
|
||
return
|
||
}
|
||
padding := int(data[length-1])
|
||
if padding > length {
|
||
err = errors.New("invalid padding size")
|
||
return
|
||
}
|
||
res = data[:length-padding]
|
||
return
|
||
}
|
||
|
||
// ==================== 账号绑定管理 ====================
|
||
|
||
func (s *xiaohongshu) HandleBindAccount(ctx context.Context, req *dto.XhsBindAccountReq) (err error) {
|
||
var account entity.CustomerServiceAccount
|
||
filter := bson.M{"platform": XhsPlatformName, "isDeleted": false}
|
||
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
var bindData dto.XhsBindAccountDecrypted
|
||
if err = json.Unmarshal([]byte(decrypted), &bindData); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
update := bson.M{
|
||
"$set": bson.M{
|
||
"accessToken": bindData.Token,
|
||
"appId": bindData.AppId,
|
||
"xhsUserId": bindData.UserId,
|
||
"updatedAt": gtime.Now().Time,
|
||
},
|
||
}
|
||
filter = bson.M{"_id": account.Id}
|
||
_, err = commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection).UpdateOne(ctx, filter, update)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
g.Log().Infof(ctx, "[小红书] 绑定账户成功: userId=%s, nickName=%s", bindData.UserId, bindData.NickName)
|
||
return
|
||
}
|
||
|
||
func (s *xiaohongshu) HandleUnbindAccount(ctx context.Context, req *dto.XhsUnbindAccountReq) (err error) {
|
||
var account entity.CustomerServiceAccount
|
||
filter := bson.M{"platform": XhsPlatformName, "isDeleted": false}
|
||
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
var unbindData dto.XhsUnbindAccountDecrypted
|
||
if err = json.Unmarshal([]byte(decrypted), &unbindData); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
update := bson.M{
|
||
"$set": bson.M{
|
||
"accessToken": "",
|
||
"xhsUserId": "",
|
||
"updatedAt": gtime.Now().Time,
|
||
},
|
||
}
|
||
filter = bson.M{"_id": account.Id}
|
||
_, err = commonMongo.GetDB().Collection(entity.CustomerServiceAccountCollection).UpdateOne(ctx, filter, update)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
g.Log().Infof(ctx, "[小红书] 解绑账户成功: userId=%s", unbindData.UserId)
|
||
return
|
||
}
|
||
|
||
// ==================== 消息收发 ====================
|
||
|
||
func (s *xiaohongshu) HandleReceiveMessage(ctx context.Context, req *dto.XhsReceiveMessageReq) (err error) {
|
||
accountId, err := s.getAccountIdByPlatform(ctx)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
var account entity.CustomerServiceAccount
|
||
filter := bson.M{"_id": accountId, "isDeleted": false}
|
||
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
decrypted, err := s.Decrypt(ctx, req.Content, account.SecretKey)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
var conversation entity.Conversation
|
||
id := bson.NewObjectID()
|
||
conversation.Id = &id // 取地址赋值给指针类型
|
||
conversation.SessionId = fmt.Sprintf("%s_%s", req.FromUserId, XhsPlatformName)
|
||
conversation.UserId = req.FromUserId
|
||
conversation.CustomerServiceId = accountId.Hex()
|
||
conversation.Role = "user"
|
||
conversation.Platform = XhsPlatformName
|
||
conversation.MessageId = req.MessageId
|
||
conversation.MessageType = req.MessageType
|
||
now := gtime.Now().Time
|
||
conversation.CreatedAt = &now // 取地址赋值给指针类型
|
||
|
||
switch req.MessageType {
|
||
case "TEXT":
|
||
var textContent dto.XhsTextContent
|
||
if err = json.Unmarshal([]byte(decrypted), &textContent); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
conversation.Content = textContent.Text
|
||
case "IMAGE":
|
||
var imgContent dto.XhsImageContent
|
||
if err = json.Unmarshal([]byte(decrypted), &imgContent); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
conversation.Content = fmt.Sprintf("[图片]%s", imgContent.Link)
|
||
case "VIDEO":
|
||
var videoContent dto.XhsVideoContent
|
||
if err = json.Unmarshal([]byte(decrypted), &videoContent); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
conversation.Content = fmt.Sprintf("[视频]%s", videoContent.Link)
|
||
case "CARD":
|
||
var cardContent dto.XhsCardContent
|
||
if err = json.Unmarshal([]byte(decrypted), &cardContent); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
conversation.Content = fmt.Sprintf("[卡片-%s]%s", cardContent.ContentType, cardContent.Id)
|
||
case "REVOKE":
|
||
var revokeContent dto.XhsRevokeContent
|
||
if err = json.Unmarshal([]byte(decrypted), &revokeContent); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
conversation.Content = fmt.Sprintf("[撤回消息]%s", revokeContent.MessageId)
|
||
case "HINT":
|
||
conversation.Content = "[系统提示消息]"
|
||
case "SMILES":
|
||
conversation.Content = "[表情消息]"
|
||
default:
|
||
conversation.Content = fmt.Sprintf("[%s类型消息]", req.MessageType)
|
||
}
|
||
|
||
_, err = commonMongo.GetDB().Collection(entity.ConversationCollection).InsertOne(ctx, conversation)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
g.Log().Infof(ctx, "[小红书] 接收消息成功: sessionId=%s, messageType=%s", conversation.SessionId, req.MessageType)
|
||
|
||
if req.MessageType == "TEXT" {
|
||
asyncCtx := context.WithoutCancel(ctx)
|
||
go s.processUserMessage(asyncCtx, &account, &conversation)
|
||
}
|
||
return
|
||
}
|
||
|
||
func (s *xiaohongshu) SendMessage(ctx context.Context, account *entity.CustomerServiceAccount, toUserId, content string) (err error) {
|
||
textContent := dto.XhsTextContent{Text: content}
|
||
contentJson, err := json.Marshal(textContent)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
encrypted, err := s.Encrypt(ctx, string(contentJson), account.SecretKey)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
now := gtime.Now()
|
||
sendReq := dto.XhsSendMessageReq{
|
||
UserId: account.XhsUserId,
|
||
RequestId: fmt.Sprintf("%d", now.UnixNano()),
|
||
MessageType: "TEXT",
|
||
FromUserId: account.XhsUserId,
|
||
ToUserId: toUserId,
|
||
ThirdAccountId: account.Id.Hex(),
|
||
Timestamp: now.UnixMilli(),
|
||
Content: encrypted,
|
||
}
|
||
|
||
url := fmt.Sprintf("%s/api/open/im/third/send", XhsApiBaseUrl)
|
||
client := g.Client()
|
||
client.SetHeader("Access-Token", account.AccessToken)
|
||
client.SetHeader("Content-Type", "application/json")
|
||
|
||
resp, err := client.Post(ctx, url, sendReq)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
defer resp.Close()
|
||
|
||
var sendRes dto.XhsSendMessageRes
|
||
if err = json.Unmarshal(resp.ReadAll(), &sendRes); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
if sendRes.Code != 0 {
|
||
err = fmt.Errorf("发送消息失败: code=%d, msg=%s", sendRes.Code, sendRes.Msg)
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
var conversation entity.Conversation
|
||
id2 := bson.NewObjectID()
|
||
conversation.Id = &id2 // 取地址赋值给指针类型
|
||
conversation.SessionId = fmt.Sprintf("%s_%s", toUserId, XhsPlatformName)
|
||
conversation.UserId = toUserId
|
||
conversation.CustomerServiceId = account.Id.Hex()
|
||
conversation.Role = "assistant"
|
||
conversation.Platform = XhsPlatformName
|
||
conversation.MessageId = sendRes.Data.MessageId
|
||
conversation.MessageType = "TEXT"
|
||
conversation.Content = content
|
||
now2 := gtime.Now().Time
|
||
conversation.CreatedAt = &now2 // 取地址赋值给指针类型
|
||
|
||
_, err = commonMongo.GetDB().Collection(entity.ConversationCollection).InsertOne(ctx, conversation)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
g.Log().Infof(ctx, "[小红书] 发送消息成功: toUserId=%s, messageId=%s", toUserId, sendRes.Data.MessageId)
|
||
return
|
||
}
|
||
|
||
func (s *xiaohongshu) GenerateSignature(ctx context.Context, secretKey, requestBody string) (res string) {
|
||
h := sha256.New()
|
||
h.Write([]byte(secretKey + requestBody))
|
||
res = hex.EncodeToString(h.Sum(nil))
|
||
return
|
||
}
|
||
|
||
// ==================== 私有方法 ====================
|
||
|
||
func (s *xiaohongshu) getAccountIdByPlatform(ctx context.Context) (res bson.ObjectID, err error) {
|
||
var account entity.CustomerServiceAccount
|
||
filter := bson.M{"platform": XhsPlatformName, "isDeleted": false}
|
||
if err = dao.MongoDAO.FindOne(ctx, filter, &account, entity.CustomerServiceAccountCollection); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
res = *account.Id // 解引用指针类型
|
||
return
|
||
}
|
||
|
||
func (s *xiaohongshu) processUserMessage(ctx context.Context, account *entity.CustomerServiceAccount, conversation *entity.Conversation) {
|
||
if err := s.sendToRAGFlowStream(ctx, account, conversation); err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
g.Log().Errorf(ctx, "[小红书] 发送到RAGFlow Stream失败: %v", err)
|
||
return
|
||
}
|
||
g.Log().Infof(ctx, "[小红书] 消息已发送到RAGFlow Stream: userId=%s", conversation.UserId)
|
||
}
|
||
|
||
func (s *xiaohongshu) sendToRAGFlowStream(ctx context.Context, account *entity.CustomerServiceAccount, conversation *entity.Conversation) (err error) {
|
||
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
|
||
replyQueue := rabbitmq.GetInstanceQueueName(baseQueue)
|
||
|
||
msg := &redis.SendStreamMessage{
|
||
UserId: fmt.Sprintf("%s_%s", XhsPlatformName, conversation.UserId),
|
||
TenantId: gconv.String(account.TenantId),
|
||
Content: conversation.Content,
|
||
Timestamp: gtime.New(conversation.CreatedAt).Timestamp(),
|
||
MessageId: conversation.MessageId,
|
||
Platform: XhsPlatformName,
|
||
AccountId: account.Id.Hex(),
|
||
AccountName: account.AccountName,
|
||
ReplyQueue: replyQueue,
|
||
}
|
||
|
||
if sessionId, _ := redis.GetSessionCache(ctx, gconv.String(account.TenantId), msg.UserId); sessionId == "" {
|
||
if history, histErr := dao.Conversation.GetRecentHistory(ctx, msg.UserId, redis.GetHistoryContextLimit()); histErr == nil && len(history) > 0 {
|
||
msg.History = history
|
||
g.Log().Infof(ctx, "[小红书] 用户已归档,读取 %d 轮历史对话", len(history))
|
||
}
|
||
}
|
||
|
||
streamMsgId, err := redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err)
|
||
return
|
||
}
|
||
|
||
g.Log().Infof(ctx, "[小红书] 消息已写入Stream: streamMsgId=%s, sessionId=%s", streamMsgId, conversation.SessionId)
|
||
return
|
||
}
|