package session import ( "context" "fmt" "gitea.com/red-future/common/beans" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" "prompts-core/common/util" "prompts-core/dao" "prompts-core/model/dto" "prompts-core/model/entity" ) // ============================================ // 回调存储 // ============================================ // Callback 会话回调 func Callback(ctx context.Context, req *dto.SessionCallbackReq) (*dto.SessionCallbackRes, error) { fmt.Println("打印会话回调", req) req.Messages["role"] = "assistant" // 1) 更新 DB _, err := dao.ComposeSession.Update(ctx, &entity.ComposeSession{ SQLBaseDO: beans.SQLBaseDO{Id: req.EpicycleId}, ResponseContent: req.Messages, }) if err != nil { g.Log().Errorf(ctx, "[会话回调] 更新数据库失败 epicycleId=%d err=%v", req.EpicycleId, err) return nil, fmt.Errorf("更新数据库失败: %w", err) } // 2) 查询完整记录 session, err := dao.ComposeSession.Get(ctx, &entity.ComposeSession{ SQLBaseDO: beans.SQLBaseDO{Id: req.EpicycleId}, }) if err != nil || session == nil { return nil, fmt.Errorf("会话不存在: epicycleId=%d", req.EpicycleId) } // 3) entity → HistoryRound → 写入 Redis round := entityToHistoryRound(session) round.Assistant = req.Messages if err = SaveToRedis(ctx, session.TenantId, session.SessionId, round); err != nil { return nil, fmt.Errorf("redis存储失败: %w", err) } g.Log().Infof(ctx, "[会话回调] 存储成功 sessionId=%s id=%d", session.SessionId, session.Id) return &dto.SessionCallbackRes{Status: true, SessionId: session.SessionId}, nil } // ============================================ // 场景1:前端历史列表(按 creator) // ============================================ // GetHistoryList 获取历史列表 func GetHistoryList(ctx context.Context, req *dto.GetHistoryListReq) (*dto.GetHistoryListRes, error) { user, err := utils.GetUserInfo(ctx) if err != nil { return nil, err } sessions, total, err := dao.ComposeSession.List(ctx, &entity.ComposeSession{ SQLBaseDO: beans.SQLBaseDO{Creator: user.UserName}, }, req.Page, req.Size) if err != nil { return nil, fmt.Errorf("DB获取历史列表失败: %w", err) } rounds := sessionsToHistoryRounds(sessions) return &dto.GetHistoryListRes{List: rounds, Total: total}, nil } // ============================================ // 场景2:提示词拼接(按 sessionId + nodeId) // ============================================ // GetHistoryMessages 获取历史消息(Redis → DB → 异步回种) func GetHistoryMessages(ctx context.Context, req *dto.GetHistoryMessagesReq) (*dto.GetHistoryMessagesRes, error) { user, err := utils.GetUserInfo(ctx) if err != nil { return nil, err } // 1) Redis if rounds, err := GetFromRedis(ctx, user.TenantId, req.SessionId); err == nil && len(rounds) > 0 { g.Log().Debugf(ctx, "[历史消息] Redis命中 sessionId=%s count=%d", req.SessionId, len(rounds)) return &dto.GetHistoryMessagesRes{Messages: flattenRounds(rounds)}, nil } // 2) DB maxRounds := util.GetMaxRounds(ctx) sessions, _, err := dao.ComposeSession.List(ctx, &entity.ComposeSession{ SQLBaseDO: beans.SQLBaseDO{Creator: user.UserName}, SessionId: req.SessionId, NodeId: req.NodeId, }, 1, maxRounds) if err != nil { return nil, fmt.Errorf("DB获取历史失败: %w", err) } if len(sessions) == 0 { return &dto.GetHistoryMessagesRes{Messages: []dto.FlatMessage{}}, nil } // 3) 转换 + 异步回种 rounds := sessionsToHistoryRounds(sessions) go asyncCacheToRedis(context.WithoutCancel(ctx), user.TenantId, req.SessionId, rounds) return &dto.GetHistoryMessagesRes{Messages: flattenRounds(rounds)}, nil } // ============================================ // 删除 // ============================================ // DeleteMessages 批量删除消息 func DeleteMessages(ctx context.Context, req *dto.DeleteMessagesReq) (*dto.DeleteMessagesRes, error) { if len(req.MsgIds) == 0 { return &dto.DeleteMessagesRes{Ok: false}, fmt.Errorf("msgIds不能为空") } // 1) 删 DB for _, id := range req.MsgIds { _, _ = dao.ComposeSession.Delete(ctx, &entity.ComposeSession{ SQLBaseDO: beans.SQLBaseDO{Id: id}, }) } user, err := utils.GetUserInfo(ctx) if err != nil { return nil, err } // 2) 删 Redis _ = DeleteRedisMessages(ctx, user.TenantId, req.SessionId, req.MsgIds) return &dto.DeleteMessagesRes{Ok: true}, nil } // DeleteSession 删除整个会话 func DeleteSession(ctx context.Context, req *dto.DeleteSessionReq) (*dto.DeleteSessionRes, error) { // 1) 删 DB if _, err := dao.ComposeSession.Delete(ctx, &entity.ComposeSession{ SessionId: req.SessionId, }); err != nil { return nil, fmt.Errorf("DB删除失败: %w", err) } user, err := utils.GetUserInfo(ctx) if err != nil { return nil, err } // 2) 删 Redis if err := DeleteSessionHistory(ctx, user.TenantId, req.SessionId); err != nil { g.Log().Warningf(ctx, "[删除会话] Redis删除失败 sessionId=%s err=%v", req.SessionId, err) } return &dto.DeleteSessionRes{Ok: true}, nil } // ============================================ // 转换方法(entity ↔ dto,集中管理) // ============================================ // entityToHistoryRound entity → HistoryRound func entityToHistoryRound(s *entity.ComposeSession) *dto.HistoryRound { return &dto.HistoryRound{ Id: s.Id, SessionId: s.SessionId, NodeId: s.NodeId, CreatedAt: gconv.String(s.CreatedAt), UpdatedAt: gconv.String(s.UpdatedAt), User: s.RequestContent, Assistant: s.ResponseContent, } } // sessionsToHistoryRounds 批量转换 func sessionsToHistoryRounds(sessions []*entity.ComposeSession) []dto.HistoryRound { rounds := make([]dto.HistoryRound, 0, len(sessions)) for _, s := range sessions { rounds = append(rounds, *entityToHistoryRound(s)) } return rounds } // asyncCacheToRedis 异步缓存到 Redis func asyncCacheToRedis(ctx context.Context, tenantID uint64, sessionID string, rounds []dto.HistoryRound) { for i := range rounds { if rounds[i].User != nil || rounds[i].Assistant != nil { _ = SaveToRedis(ctx, tenantID, sessionID, &rounds[i]) } } }