Files
common/full-text-search/meilisearch/meilisearch.go
qhd bcbe6eba78 feat: 集成Eino文档解析与嵌入功能
新增Eino相关依赖,支持docx、pdf、xlsx等格式的文档加载与解析,并集成了Dashscope嵌入模型。同时修复了部分DAO查询中的OmitEmpty配置。
2026-03-28 18:24:15 +08:00

649 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// =============================================================================
// Meilisearch 业务操作封装
// 提供CRUD操作方法支持多数据源
// =============================================================================
package meilisearch
import (
"context"
"fmt"
"time"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
ms "github.com/meilisearch/meilisearch-go"
)
// =============================================================================
// 向后兼容的Meilisearch结构体
// =============================================================================
type meilisearchDB struct {
noCache bool
dataSource string // 数据源名称,默认为 "default"
}
func DB(cache ...bool) *meilisearchDB {
return &meilisearchDB{
noCache: false,
dataSource: "default",
}
}
// WithDataSource 指定使用的数据源
func (m *meilisearchDB) WithDataSource(name string) *meilisearchDB {
m.dataSource = name
return m
}
// NoCache 不使用缓存
func (m *meilisearchDB) NoCache() *meilisearchDB {
m.noCache = true
return m
}
// =============================================================================
// 全局变量
// =============================================================================
var (
manager = GetManager()
)
const PageSize = 20
// =============================================================================
// Meilisearch 操作方法(支持多数据源)
// =============================================================================
// getDataSource 获取当前使用的数据源
func (m *meilisearchDB) getDataSource() (DataSource, error) {
if m.dataSource == "" {
m.dataSource = "default"
}
return manager.GetDataSource(m.dataSource)
}
// getClient 获取 Meilisearch 客户端
func (m *meilisearchDB) getClient() (ms.ServiceManager, error) {
source, err := m.getDataSource()
if err != nil {
return nil, err
}
if c, ok := source.Client().(ms.ServiceManager); ok {
return c, nil
}
return nil, fmt.Errorf("invalid client type")
}
// indexInterface 辅助函数获取index
func indexInterface(indexName string, client ms.ServiceManager) ms.IndexManager {
return client.Index(indexName)
}
// ensureIndexExists 确保索引存在,不存在则自动创建
// 同时会检查并更新 filterable attributes 设置
func (m *meilisearchDB) ensureIndexExists(client ms.ServiceManager, indexName string) error {
// 使用 Index 方法获取索引(不存在时不会报错)
idx := client.Index(indexName)
// 先获取索引信息,检查是否存在
_, err := idx.FetchInfo()
if err != nil {
// 索引不存在,创建索引并等待完成
task, err := client.CreateIndex(&ms.IndexConfig{
Uid: indexName,
PrimaryKey: "id",
})
if err != nil {
return err
}
// 等待索引创建完成最多等待10秒
if _, err = client.WaitForTask(task.TaskUID, 10*time.Second); err != nil {
return fmt.Errorf("等待索引创建失败: %w", err)
}
// 重新获取索引
idx = client.Index(indexName)
}
// 检查并更新 filterable attributes
settings, err := idx.GetSettings()
if err != nil {
return err
}
requiredFilterable := []string{"tenantId", "isDeleted", "datasetId", "creator", "updater"}
needUpdate := false
// 检查是否缺少必要的 filterable attributes
existingFilterable := make(map[string]bool)
for _, attr := range settings.FilterableAttributes {
existingFilterable[attr] = true
}
for _, attr := range requiredFilterable {
if !existingFilterable[attr] {
needUpdate = true
break
}
}
if needUpdate {
// 合并现有的 filterable attributes 和新增的
allFilterable := append(settings.FilterableAttributes, requiredFilterable...)
uniqueFilterable := make(map[string]bool)
var finalFilterable []string
for _, attr := range allFilterable {
if !uniqueFilterable[attr] {
uniqueFilterable[attr] = true
finalFilterable = append(finalFilterable, attr)
}
}
updateSettings := &ms.Settings{
FilterableAttributes: finalFilterable,
}
task, err := idx.UpdateSettings(updateSettings)
if err != nil {
return err
}
// 等待设置更新完成最多等待10秒
if _, err = client.WaitForTask(task.TaskUID, 10*time.Second); err != nil {
return fmt.Errorf("等待设置更新失败: %w", err)
}
}
return nil
}
// buildSearchRequest 构建搜索请求
func (m *meilisearchDB) buildSearchRequest(ctx context.Context, searchParams *SearchParams) (*ms.SearchRequest, error) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
req := &ms.SearchRequest{
Limit: int64(PageSize),
Page: int64(0),
}
// 设置查询
if searchParams.Query != "" {
req.Query = searchParams.Query
}
// 设置分页
if searchParams.Page > 0 {
req.Page = int64(searchParams.Page - 1)
}
if searchParams.Limit > 0 {
req.Limit = int64(searchParams.Limit)
}
// 设置排序
if len(searchParams.Sort) > 0 {
req.Sort = searchParams.Sort
}
// 设置过滤条件(包含租户过滤和软删除过滤)
filter := ""
if !g.IsEmpty(user.TenantId) {
filter = fmt.Sprintf("tenantId = %s", gconv.String(user.TenantId))
}
if filter == "" {
filter = "isDeleted = false"
} else {
filter += " AND isDeleted = false"
}
// 添加用户自定义过滤条件
if searchParams.Filter != "" {
if filter == "" {
filter = searchParams.Filter
} else {
filter += " AND " + searchParams.Filter
}
}
if filter != "" {
req.Filter = filter
}
// 设置可搜索字段
if searchParams.SearchableAttributes != "" {
req.AttributesToSearchOn = []string{searchParams.SearchableAttributes}
}
// 设置返回字段
if len(searchParams.AttributesToRetrieve) > 0 {
req.AttributesToRetrieve = searchParams.AttributesToRetrieve
}
return req, nil
}
// Search 搜索文档(索引不存在时返回空结果)
func (m *meilisearchDB) Search(ctx context.Context, searchParams *SearchParams, indexName string, result interface{}) (total int64, err error) {
client, err := m.getClient()
if err != nil {
return 0, err
}
// 检查索引是否存在,不存在则返回空结果
if _, err = client.GetIndex(indexName); err != nil {
return 0, nil
}
// 构建搜索请求
req, err := m.buildSearchRequest(ctx, searchParams)
if err != nil {
return 0, err
}
// Redis 缓存处理
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
cacheKey := fmt.Sprintf("meilisearch:search:%s:%s:%+v", user.TenantId, indexName, searchParams)
if !m.noCache {
var resultStr *gvar.Var
resultStr, err = g.Redis().Get(ctx, cacheKey)
if err != nil {
return
}
if !g.IsEmpty(resultStr) {
searchResult := &SearchResult{}
if err = gconv.Struct(resultStr, searchResult); err != nil {
return
}
total = int64(searchResult.EstimatedTotalHits)
if len(searchResult.Hits) > 0 {
if resultArr, ok := result.(*[]map[string]interface{}); ok {
*resultArr = searchResult.Hits
} else {
err = gconv.Structs(searchResult.Hits, result)
if err != nil {
return
}
}
}
return
}
}
// 执行搜索
idx := indexInterface(indexName, client)
searchResp, err := idx.Search(searchParams.Query, req)
if err != nil {
return 0, err
}
total = int64(searchResp.EstimatedTotalHits)
// 解析结果
if len(searchResp.Hits) > 0 {
hits := make([]map[string]interface{}, 0, len(searchResp.Hits))
for _, hit := range searchResp.Hits {
hitMap := gconv.Map(hit)
// 移除 Meilisearch 内部字段
delete(hitMap, "_formatted")
hits = append(hits, hitMap)
}
if resultArr, ok := result.(*[]map[string]interface{}); ok {
*resultArr = hits
} else {
err = gconv.Structs(hits, result)
if err != nil {
return
}
}
}
// 写入缓存
if !m.noCache {
hitList := make([]map[string]interface{}, 0)
if len(searchResp.Hits) > 0 {
for _, hit := range searchResp.Hits {
hitMap := gconv.Map(hit)
delete(hitMap, "_formatted")
hitList = append(hitList, hitMap)
}
}
searchResult := &SearchResult{
Hits: hitList,
EstimatedTotalHits: searchResp.EstimatedTotalHits,
Limit: int(searchResp.Limit),
Offset: int(searchResp.Offset),
ProcessingTimeMs: int(searchResp.ProcessingTimeMs),
}
err = g.Redis().SetEX(ctx, cacheKey, searchResult, int64(time.Hour))
if err != nil {
return
}
}
return
}
// Insert 插入文档(自动创建索引)
func (m *meilisearchDB) Insert(ctx context.Context, document interface{}, indexName string) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
// 确保索引存在
if err = m.ensureIndexExists(c, indexName); err != nil {
return 0, err
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
// 转换为 map
docMap := gconv.Map(document)
// 设置租户ID
if !g.IsEmpty(user.TenantId) && g.IsEmpty(docMap["tenantId"]) {
docMap["tenantId"] = user.TenantId
}
// 设置创建人
if !g.IsEmpty(user.UserName) && g.IsEmpty(docMap["creator"]) {
docMap["creator"] = user.UserName
}
// 设置更新人
if !g.IsEmpty(user.UserName) && g.IsEmpty(docMap["updater"]) {
docMap["updater"] = user.UserName
}
// 设置时间
now := gtime.Now().Time
if g.IsEmpty(docMap["createdAt"]) {
docMap["createdAt"] = now.Unix()
}
if g.IsEmpty(docMap["updatedAt"]) {
docMap["updatedAt"] = now.Unix()
}
// 设置删除标记
if g.IsEmpty(docMap["isDeleted"]) {
docMap["isDeleted"] = false
}
// 执行插入
documents := []map[string]interface{}{docMap}
idx := indexInterface(indexName, c)
task, err := idx.AddDocuments(documents, nil)
if err != nil {
return 0, err
}
// 清理缓存
err = m.cleanCache(ctx, indexName, user.TenantId)
if err != nil {
glog.Warning(ctx, "清理Redis缓存失败:", err)
}
return task.TaskUID, nil
}
// InsertMany 批量插入文档(自动创建索引)
func (m *meilisearchDB) InsertMany(ctx context.Context, documents []interface{}, indexName string) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
// 确保索引存在
if err = m.ensureIndexExists(c, indexName); err != nil {
return 0, err
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return 0, err
}
docs := make([]map[string]interface{}, 0, len(documents))
for _, document := range documents {
docMap := gconv.Map(document)
// 设置租户ID
if !g.IsEmpty(user.TenantId) && g.IsEmpty(docMap["tenantId"]) {
docMap["tenantId"] = user.TenantId
}
// 设置创建人
if !g.IsEmpty(user.UserName) && g.IsEmpty(docMap["creator"]) {
docMap["creator"] = user.UserName
}
// 设置更新人
if !g.IsEmpty(user.UserName) && g.IsEmpty(docMap["updater"]) {
docMap["updater"] = user.UserName
}
// 设置时间
now := gtime.Now().Time
if g.IsEmpty(docMap["createdAt"]) {
docMap["createdAt"] = now.Unix()
}
if g.IsEmpty(docMap["updatedAt"]) {
docMap["updatedAt"] = now.Unix()
}
// 设置删除标记
if g.IsEmpty(docMap["isDeleted"]) {
docMap["isDeleted"] = false
}
docs = append(docs, docMap)
}
// 执行批量插入
idx := indexInterface(indexName, c)
task, err := idx.AddDocuments(docs, nil)
if err != nil {
return 0, err
}
// 清理缓存
err = m.cleanCache(ctx, indexName, user.TenantId)
if err != nil {
glog.Warning(ctx, "清理Redis缓存失败:", err)
}
return task.TaskUID, nil
}
// Update 更新文档
func (m *meilisearchDB) Update(ctx context.Context, document interface{}, indexName string) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return 0, err
}
// 转换为 map
docMap := gconv.Map(document)
// 设置更新人
if !g.IsEmpty(user.UserName) && g.IsEmpty(docMap["updater"]) {
docMap["updater"] = user.UserName
}
// 设置更新时间
docMap["updatedAt"] = gtime.Now().Unix()
// 执行更新
documents := []map[string]interface{}{docMap}
idx := indexInterface(indexName, c)
task, err := idx.UpdateDocuments(documents, nil)
if err != nil {
return 0, err
}
// 清理缓存
err = m.cleanCache(ctx, indexName, user.TenantId)
if err != nil {
glog.Warning(ctx, "清理Redis缓存失败:", err)
}
return task.TaskUID, nil
}
// Delete 删除文档
func (m *meilisearchDB) Delete(ctx context.Context, id string, indexName string) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
// 执行删除
idx := indexInterface(indexName, c)
task, err := idx.DeleteDocument(id, nil)
if err != nil {
return 0, err
}
// 清理缓存
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
err = m.cleanCache(ctx, indexName, user.TenantId)
if err != nil {
glog.Warning(ctx, "清理Redis缓存失败:", err)
}
return task.TaskUID, nil
}
// DeleteSoft 软删除文档
func (m *meilisearchDB) DeleteSoft(ctx context.Context, id string, indexName string) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return 0, err
}
// 软删除:更新 isDeleted 字段
updateMap := map[string]interface{}{
"id": id,
"isDeleted": true,
"updater": user.UserName,
"updatedAt": gtime.Now().Unix(),
}
// 执行更新
documents := []map[string]interface{}{updateMap}
idx := indexInterface(indexName, c)
task, err := idx.UpdateDocuments(documents, nil)
if err != nil {
return 0, err
}
// 清理缓存
err = m.cleanCache(ctx, indexName, user.TenantId)
if err != nil {
glog.Warning(ctx, "清理Redis缓存失败:", err)
}
return task.TaskUID, nil
}
// Get 获取单个文档
func (m *meilisearchDB) Get(ctx context.Context, id string, indexName string, result interface{}) (err error) {
c, err := m.getClient()
if err != nil {
return err
}
// Redis 缓存处理
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
cacheKey := fmt.Sprintf("meilisearch:doc:%s:%s:%s", user.TenantId, indexName, id)
if !m.noCache {
var resultStr *gvar.Var
resultStr, err = g.Redis().Get(ctx, cacheKey)
if err != nil {
return
}
if !g.IsEmpty(resultStr) {
return gconv.Scan(resultStr, result)
}
}
// 执行查询
var doc map[string]interface{}
idx := indexInterface(indexName, c)
err = idx.GetDocument(id, nil, &doc)
if err != nil {
return err
}
// 过滤已删除的文档
if gconv.Bool(doc["isDeleted"]) {
return gerror.New("文档不存在")
}
err = gconv.Struct(doc, result)
if err != nil {
return err
}
// 写入缓存
if !m.noCache {
err = g.Redis().SetEX(ctx, cacheKey, result, int64(time.Hour))
if err != nil {
return err
}
}
return nil
}
// cleanCache 清理缓存
func (m *meilisearchDB) cleanCache(ctx context.Context, indexName string, tenantId interface{}) error {
// 清理搜索缓存
searchKeys, err := g.Redis().Keys(ctx, fmt.Sprintf("meilisearch:search:%s:%s:*", tenantId, indexName))
if err != nil {
return err
}
for _, key := range searchKeys {
_, err = g.Redis().Del(ctx, key)
if err != nil {
return err
}
}
return nil
}
// GetClient 获取原始客户端(用于高级操作)
func (m *meilisearchDB) GetClient() (ms.ServiceManager, error) {
return m.getClient()
}