17 Commits

Author SHA1 Message Date
qhd
06deed7ade feat: go.mod 2026-04-02 17:28:27 +08:00
qhd
62a09c0e49 feat: 去掉跨域,在网关统一做跨域处理 2026-04-02 16:37:32 +08:00
qhd
0c2e36f607 feat: gfdb增加noTenantId 2026-04-02 10:37:31 +08:00
b1c62b3f16 mongo初始化控制台打印优化 2026-04-02 09:12:59 +08:00
81d8b70f0d Dockerfile 2026-04-01 16:18:04 +08:00
41b2a37fc0 Dockerfile 2026-04-01 14:19:50 +08:00
0ad6bc9438 Dockerfile 2026-04-01 14:11:07 +08:00
qhd
7d2dec86b4 feat: 添加基于Consul的RPC客户端和服务端 2026-04-01 13:39:32 +08:00
qhd
78969f5e6a Merge branch 'dev' of http://116.204.74.41:3000/red-future/common into dev 2026-04-01 13:38:57 +08:00
qhd
2e4a0a89f1 feat: 支持多模型提供商 embedding 2026-04-01 13:38:33 +08:00
3798b90447 Dockerfile 2026-04-01 11:37:43 +08:00
qhd
bcbe6eba78 feat: 集成Eino文档解析与嵌入功能
新增Eino相关依赖,支持docx、pdf、xlsx等格式的文档加载与解析,并集成了Dashscope嵌入模型。同时修复了部分DAO查询中的OmitEmpty配置。
2026-03-28 18:24:15 +08:00
qhd
f85314f119 refactor: 优化数据库查询构建链式调用 2026-03-28 11:10:02 +08:00
qhd
ac6beab363 refactor: 优化数据库查询构建链式调用 2026-03-27 14:55:44 +08:00
qhd
0a38df71c9 refactor: 将分布式锁从 redis 迁移至 utils 包 2026-03-27 09:49:43 +08:00
qhd
3cf301275f feat: 添加租户ID字段并重构文件上传逻辑 2026-03-24 16:17:22 +08:00
qhd
bb88eb1e47 style: 调整 minio.go 导入顺序 2026-03-24 10:33:47 +08:00
23 changed files with 1934 additions and 708 deletions

View File

@@ -41,6 +41,7 @@ type MongoBaseDO struct {
// SQLBaseDO SQL数据库基础实体
type SQLBaseDO struct {
Id int64 `orm:"id" json:"id"` // 主键ID
TenantId uint64 `orm:"tenant_id" json:"tenantId"` // 租户ID
Creator string `orm:"creator" json:"creator"` // 创建人
CreatedAt *gtime.Time `orm:"created_at" json:"createdAt"` // 创建时间
Updater string `orm:"updater" json:"updater"` // 更新人
@@ -51,6 +52,7 @@ type SQLBaseDO struct {
type SQLBaseCol struct {
Id string
TenantId string
Creator string
CreatedAt string
Updater string
@@ -61,6 +63,7 @@ type SQLBaseCol struct {
var DefSQLBaseCol = SQLBaseCol{
Id: "id",
TenantId: "tenant_id",
Creator: "creator",
CreatedAt: "created_at",
Updater: "updater",

View File

@@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
"github.com/gogf/gf/contrib/registry/consul/v2"
"github.com/gogf/gf/v2/frame/g"
@@ -14,30 +16,125 @@ import (
"github.com/gogf/gf/v2/util/grand"
)
var initOnce sync.Once
var (
registry gsvc.Registry
consulAddr string
reconnectMutex sync.RWMutex
reconnectDone chan struct{}
connected bool
httpClient *http.Client
)
// Init 初始化 Consul 注册中心(延迟初始化,首次调用时执行)
func Init() {
initOnce.Do(func() {
consulAddr := g.Cfg().MustGet(context.Background(), "consul.address").String()
if consulAddr == "" {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
// connectConsul 连接 Consul
func connectConsul(ctx context.Context) error {
reconnectMutex.Lock()
defer reconnectMutex.Unlock()
// 如果已经连接,不再重复连接
if connected && registry != nil {
return nil
}
var err error
registry, err = consul.New(consul.WithAddress(consulAddr))
if err != nil {
g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err)
return err
}
gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
connected = true
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
return nil
}
// startHealthCheckAndReconnect 启动健康检查和自动重连
func startHealthCheckAndReconnect() {
if reconnectDone != nil {
close(reconnectDone)
}
reconnectDone = make(chan struct{})
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
ctx := context.Background()
// 初始化HTTP客户端用于健康检查
httpClient = &http.Client{
Timeout: 5 * time.Second,
}
for {
select {
case <-ticker.C:
// 检查服务发现是否正常工作
if checkConsulHealth(ctx) {
continue
}
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
// 重置连接状态并重连
reconnectMutex.Lock()
connected = false
registry = nil
reconnectMutex.Unlock()
if err := connectConsul(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,30秒后重试...", err)
}
case <-reconnectDone:
g.Log().Info(ctx, "🛑 Consul 健康检查已停止")
return
}
registry, err := consul.New(consul.WithAddress(consulAddr))
if err != nil {
g.Log().Errorf(context.Background(), "Consul 初始化失败: %v", err)
return
}
gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
g.Log().Infof(context.Background(), "✅ Consul 初始化成功: %s", consulAddr)
})
}
}
// checkConsulHealth 检查 Consul 健康状态
func checkConsulHealth(ctx context.Context) bool {
reconnectMutex.RLock()
defer reconnectMutex.RUnlock()
if registry == nil || !connected {
return false
}
// 使用consul原生API进行健康检查
// 调用 /v1/agent/self 接口检测连接状态
url := fmt.Sprintf("http://%s/v1/agent/self", consulAddr)
resp, err := httpClient.Get(url)
if err != nil {
g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
g.Log().Debugf(ctx, "Consul 健康检查失败,状态码: %d", resp.StatusCode)
return false
}
//g.Log().Debugf(ctx, "✅ Consul 健康检查通过")
return true
}
func init() {
// 默认自动初始化(保持向后兼容)
Init()
consulAddr = g.Cfg().MustGet(context.Background(), "consul.address").String()
if consulAddr == "" {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
return
}
if err := connectConsul(context.Background()); err != nil {
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
} else {
// 连接成功后启动健康检查和自动重连
go startHealthCheckAndReconnect()
}
}
func getLocalIP() (string, error) {
// 获取本机所有网络接口
@@ -69,9 +166,14 @@ func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service
}
func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) {
watch, err := gsvc.GetRegistry().Watch(ctx, name)
if err != nil {
err = errors.New("获取服务监听器失败")
return
}
service, err := watch.Proceed()
if err != nil || service == nil {
err = errors.New("获取customerService服务实例失败")
err = errors.New("获取服务实例失败")
return
}
//优先使用客户端IP获取实例(前后端在同一台机器调试)

View File

@@ -9,6 +9,7 @@ import (
"strings"
"time"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/utils"
"github.com/bwmarrin/snowflake"
"github.com/gogf/gf/v2/crypto/gmd5"
@@ -173,12 +174,25 @@ func insertHook(ctx context.Context, in *gdb.HookInsertInput) (result sql.Result
if _, ok := in.Data[i]["id"]; ok {
in.Data[i]["id"] = node.Generate().Int64()
}
if !g.IsEmpty(userInfo.UserName) {
if _, ok := in.Data[i]["creator"]; ok {
in.Data[i]["creator"] = userInfo.UserName
if _, ok := in.Data[i]["tenant_id"]; ok {
if !g.IsEmpty(userInfo.TenantId) {
in.Data[i]["tenant_id"] = userInfo.TenantId
} else {
return nil, fmt.Errorf("tenantId cannot be empty")
}
if _, ok := in.Data[i]["updater"]; ok {
}
if _, ok := in.Data[i]["creator"]; ok {
if !g.IsEmpty(userInfo.UserName) {
in.Data[i]["creator"] = userInfo.UserName
} else {
return nil, fmt.Errorf("user info cannot be empty")
}
}
if _, ok := in.Data[i]["updater"]; ok {
if !g.IsEmpty(userInfo.UserName) {
in.Data[i]["updater"] = userInfo.UserName
} else {
return nil, fmt.Errorf("user info cannot be empty")
}
}
}
@@ -207,16 +221,22 @@ func updateHook(ctx context.Context, in *gdb.HookUpdateInput) (result sql.Result
switch data := in.Data.(type) {
case gdb.Map:
if !g.IsEmpty(userInfo.UserName) {
if _, ok := data["updater"]; ok {
if _, ok := data["updater"]; ok {
if !g.IsEmpty(userInfo.UserName) {
data["updater"] = userInfo.UserName
} else {
return nil, fmt.Errorf("user info cannot be empty")
}
}
case gdb.List:
for i := range data {
if !g.IsEmpty(userInfo.UserName) {
if _, ok := data[i]["updater"]; ok {
data[i]["updater"] = userInfo.UserName
if !g.IsEmpty(userInfo.UserName) {
data[i]["updater"] = userInfo.UserName
} else {
return nil, fmt.Errorf("user info cannot be empty")
}
}
}
}
@@ -261,11 +281,48 @@ func deleteHook(ctx context.Context, in *gdb.HookDeleteInput) (result sql.Result
// ==================== Select钩子缓存读取 ====================
func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
traceID := getTraceID(ctx)
var tenantId uint64
// ===================== 最终版安全追加租户ID =====================
tenantEnabled, err := gcache.Get(ctx, getTraceID(ctx, noTenantIdKeyPrefix))
if err != nil {
return
}
if !gconv.Bool(tenantEnabled) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
tenantId = user.TenantId
// 【关键修复】找到 SQL 中第一个出现的 ORDER BY / GROUP BY / LIMIT 等关键字位置
sql := in.Sql
insertPos := len(sql)
keywords := []string{" ORDER BY ", " GROUP BY ", " HAVING ", " LIMIT ", " FOR UPDATE"}
for _, kw := range keywords {
if idx := gstr.PosI(sql, kw); idx != -1 {
insertPos = idx
break
}
}
enabled, err := gcache.Get(ctx, traceID)
// 【正确拼接】把条件插入到关键字之前,而不是直接拼在最后
condition := " " + beans.DefSQLBaseCol.TenantId + " = ?"
if gstr.Contains(gstr.ToUpper(sql), " WHERE ") {
// 有 WHERE → 加 AND
in.Sql = sql[:insertPos] + " AND" + condition + sql[insertPos:]
} else {
// 无 WHERE → 加 WHERE
in.Sql = sql[:insertPos] + " WHERE" + condition + sql[insertPos:]
}
in.Args = append(in.Args, tenantId)
}
// ==================================================================
cacheEnabled, err := gcache.Get(ctx, getTraceID(ctx, cacheKeyPrefix))
if err != nil {
return
}
// 未启用缓存,直接执行查询
if !gconv.Bool(enabled) {
if !gconv.Bool(cacheEnabled) {
return in.Next(ctx)
}
@@ -284,18 +341,13 @@ func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result
}
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
encrypt, err := gmd5.Encrypt(fmt.Sprintf("%s:%s", whereCondition, in.Args))
if err != nil {
return nil, err
}
// 构建缓存keysql:tenantId:table:where条件:args
cacheKey := fmt.Sprintf("%s:%s:%s", getCacheKey(user.TenantId, in.Table, false), getSelectTypeString(in.SelectType), encrypt)
cacheKey := fmt.Sprintf("%s:%s:%s", getCacheKey(tenantId, in.Table, false), getSelectTypeString(in.SelectType), encrypt)
// 1. 先查缓存
if data, ok := getFromCache(ctx, cacheKey); ok {
@@ -324,7 +376,12 @@ func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result
}
func getCacheKey(tenantId uint64, table string, isBlur bool) string {
cacheKey := fmt.Sprintf("sql:tenantId-%v:%s", tenantId, table)
var cacheKey string
if g.IsEmpty(tenantId) {
cacheKey = fmt.Sprintf("sql:%s", table)
} else {
cacheKey = fmt.Sprintf("sql:tenantId-%v:%s", tenantId, table)
}
if isBlur {
cacheKey = fmt.Sprintf("%s:*", cacheKey)
}
@@ -350,10 +407,13 @@ func getSelectTypeString(selectType gdb.SelectType) string {
// ==================== 调用方法 ====================
var (
schemaPrefix = "tenant-"
schemaPrefix = "tenant-"
cacheKeyPrefix = "cache-"
noTenantIdKeyPrefix = "tenantId-"
)
type Gfdb interface {
Exec(ctx context.Context, sql string, args ...any) (sql.Result, error)
Model(ctx context.Context, tableNameOrStruct ...any) *model
Transaction(ctx context.Context, f func(ctx context.Context, tx gdb.TX) error) error
}
@@ -362,25 +422,43 @@ type cache interface {
Cache(ctx context.Context) *gdb.Model
}
type model struct {
*gdb.Model
type noTenantId interface {
NoTenantId(ctx context.Context) *modelCache
}
type dataBase struct {
gdb.DB
}
func DB(ctx context.Context) Gfdb {
var dbName []string
type model struct {
*gdb.Model
}
type modelCache struct {
*model
}
func checkSchemaConfig(ctx context.Context) (uint64, bool) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err)
return nil
return 0, false
}
var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId)
sprintf := fmt.Sprintf("database.%s", schema)
if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() {
return user.TenantId, true
}
return user.TenantId, false
}
func DB(ctx context.Context) Gfdb {
tenantId, config := checkSchemaConfig(ctx)
var schema = fmt.Sprintf("%s%v", schemaPrefix, tenantId)
var dbName []string
if config {
dbName = append(dbName, schema)
} else {
dbName = append(dbName, "default")
@@ -399,29 +477,24 @@ func DB(ctx context.Context) Gfdb {
}
func (d *dataBase) Model(ctx context.Context, tableNameOrStruct ...any) *model {
user, err := utils.GetUserInfo(ctx)
if err != nil {
glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err)
return nil
}
m := d.DB.Model(tableNameOrStruct...).Ctx(ctx)
var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId)
sprintf := fmt.Sprintf("database.%s", schema)
if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() {
tenantId, config := checkSchemaConfig(ctx)
if config {
// 创建按地区分库的配置
shardingConfig := gdb.ShardingConfig{
Schema: gdb.ShardingSchemaConfig{
Enable: true, // 启用分库
Prefix: schemaPrefix, // 分库前缀
Rule: &RegionShardingRule{RegionMapping: user.TenantId}, // 自定义分库规则
Enable: true, // 启用分库
Prefix: schemaPrefix, // 分库前缀
Rule: &RegionShardingRule{RegionMapping: tenantId}, // 自定义分库规则
},
}
m.Sharding(shardingConfig).ShardingValue(user.TenantId)
m.Sharding(shardingConfig).ShardingValue(tenantId)
}
m.OmitNilData().OmitNilWhere().Hook(catchSQLHook())
m.OmitNil().Hook(catchSQLHook())
return &model{
Model: m,
}
@@ -432,7 +505,7 @@ func (d *dataBase) Transaction(ctx context.Context, f func(ctx context.Context,
}
func (d *model) Cache(ctx context.Context) *gdb.Model {
traceID := getTraceID(ctx)
traceID := getTraceID(ctx, cacheKeyPrefix)
if traceID == "" {
glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty")
return nil
@@ -444,11 +517,28 @@ func (d *model) Cache(ctx context.Context) *gdb.Model {
return d.Model
}
func (d *model) NoTenantId(ctx context.Context) *modelCache {
traceID := getTraceID(ctx, noTenantIdKeyPrefix)
if traceID == "" {
glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty")
return nil
}
if err := gcache.Set(ctx, traceID, true, time.Second); err != nil {
glog.Errorf(ctx, "[DB] Cache error: %v", err)
return nil
}
return &modelCache{
&model{
Model: d.Model,
},
}
}
// getTraceID 从 context 中获取链路追踪 ID
func getTraceID(ctx context.Context) string {
func getTraceID(ctx context.Context, prefix string) string {
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
return span.SpanContext().TraceID().String()
return fmt.Sprintf("%s%v", prefix, span.SpanContext().TraceID().String())
}
return ""
}
@@ -475,3 +565,19 @@ func (r *RegionShardingRule) TableName(ctx context.Context, config gdb.ShardingT
// 这里不实现分表,返回空字符串
return "", nil
}
func GetTablePrefix(ctx context.Context) (prefix string, err error) {
tenantId, config := checkSchemaConfig(ctx)
if config {
sprintf := fmt.Sprintf("database.%s%v.0.prefix", schemaPrefix, tenantId)
prefix = g.Cfg().MustGet(ctx, sprintf).String()
return
}
defaultConfig := g.Cfg().MustGet(ctx, "database.default")
if defaultConfig.IsSlice() {
prefix = g.Cfg().MustGet(ctx, "database.default.0.prefix").String()
} else {
prefix = g.Cfg().MustGet(ctx, "database.default.prefix").String()
}
return
}

View File

@@ -272,8 +272,8 @@ func init() {
// 从配置初始化多数据源
if err := manager.InitializeFromConfig(ctx); err != nil {
glog.Errorf(ctx, "❌ Failed to initialize MongoDB datasources: %v", err)
} else {
glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", manager.GetAllDataSourceNames())
} else if names := manager.GetAllDataSourceNames(); len(names) > 0 {
glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", names)
}
// 启动健康检查
@@ -291,7 +291,6 @@ func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error {
// 获取 mongo 配置下的所有子键
mongoConfig := g.Cfg().MustGet(ctx, "mongo")
if mongoConfig.IsNil() {
glog.Warningf(ctx, "no mongo configuration found in config.yml")
return nil
}

View File

@@ -86,7 +86,11 @@ func (d *BaseDataSource) Connect(ctx context.Context) error {
defer d.mu.Unlock()
// 构建客户端
d.client = ms.New(d.config.Host, ms.WithAPIKey(d.config.APIKey))
host := d.config.Host
if d.config.Port > 0 {
host = fmt.Sprintf("%s:%d", d.config.Host, d.config.Port)
}
d.client = ms.New(host, ms.WithAPIKey(d.config.APIKey))
// 测试连接
if err := d.healthCheck(ctx); err != nil {

View File

@@ -71,22 +71,97 @@ func (m *meilisearchDB) getDataSource() (DataSource, error) {
}
// getClient 获取 Meilisearch 客户端
func (m *meilisearchDB) getClient() (interface{ Index(string) interface{} }, error) {
func (m *meilisearchDB) getClient() (ms.ServiceManager, error) {
source, err := m.getDataSource()
if err != nil {
return nil, err
}
if c, ok := source.Client().(interface{ Index(string) interface{} }); ok {
if c, ok := source.Client().(ms.ServiceManager); ok {
return c, nil
}
return nil, fmt.Errorf("invalid client type")
}
// indexInterface 辅助函数获取index
func indexInterface(indexName string, client interface{ Index(string) interface{} }) interface{} {
func indexInterface(indexName string, client ms.ServiceManager) ms.IndexManager {
return client.Index(indexName)
}
// ensureIndexExists 确保索引存在,不存在则自动创建
// 同时会检查并更新 filterable attributes 设置
func (m *meilisearchDB) ensureIndexExists(client ms.ServiceManager, indexName string) error {
// 使用 Index 方法获取索引(不存在时不会报错)
idx := client.Index(indexName)
// 先获取索引信息,检查是否存在
_, err := idx.FetchInfo()
if err != nil {
// 索引不存在,创建索引并等待完成
task, err := client.CreateIndex(&ms.IndexConfig{
Uid: indexName,
PrimaryKey: "id",
})
if err != nil {
return err
}
// 等待索引创建完成最多等待10秒
if _, err = client.WaitForTask(task.TaskUID, 10*time.Second); err != nil {
return fmt.Errorf("等待索引创建失败: %w", err)
}
// 重新获取索引
idx = client.Index(indexName)
}
// 检查并更新 filterable attributes
settings, err := idx.GetSettings()
if err != nil {
return err
}
requiredFilterable := []string{"tenantId", "isDeleted", "datasetId", "creator", "updater"}
needUpdate := false
// 检查是否缺少必要的 filterable attributes
existingFilterable := make(map[string]bool)
for _, attr := range settings.FilterableAttributes {
existingFilterable[attr] = true
}
for _, attr := range requiredFilterable {
if !existingFilterable[attr] {
needUpdate = true
break
}
}
if needUpdate {
// 合并现有的 filterable attributes 和新增的
allFilterable := append(settings.FilterableAttributes, requiredFilterable...)
uniqueFilterable := make(map[string]bool)
var finalFilterable []string
for _, attr := range allFilterable {
if !uniqueFilterable[attr] {
uniqueFilterable[attr] = true
finalFilterable = append(finalFilterable, attr)
}
}
updateSettings := &ms.Settings{
FilterableAttributes: finalFilterable,
}
task, err := idx.UpdateSettings(updateSettings)
if err != nil {
return err
}
// 等待设置更新完成最多等待10秒
if _, err = client.WaitForTask(task.TaskUID, 10*time.Second); err != nil {
return fmt.Errorf("等待设置更新失败: %w", err)
}
}
return nil
}
// buildSearchRequest 构建搜索请求
func (m *meilisearchDB) buildSearchRequest(ctx context.Context, searchParams *SearchParams) (*ms.SearchRequest, error) {
user, err := utils.GetUserInfo(ctx)
@@ -154,13 +229,18 @@ func (m *meilisearchDB) buildSearchRequest(ctx context.Context, searchParams *Se
return req, nil
}
// Search 搜索文档
// Search 搜索文档(索引不存在时返回空结果)
func (m *meilisearchDB) Search(ctx context.Context, searchParams *SearchParams, indexName string, result interface{}) (total int64, err error) {
client, err := m.getClient()
if err != nil {
return 0, err
}
// 检查索引是否存在,不存在则返回空结果
if _, err = client.GetIndex(indexName); err != nil {
return 0, nil
}
// 构建搜索请求
req, err := m.buildSearchRequest(ctx, searchParams)
if err != nil {
@@ -201,14 +281,7 @@ func (m *meilisearchDB) Search(ctx context.Context, searchParams *SearchParams,
// 执行搜索
idx := indexInterface(indexName, client)
var searchResp *ms.SearchResponse
if i, ok := idx.(interface {
Search(string, *ms.SearchRequest) (*ms.SearchResponse, error)
}); ok {
searchResp, err = i.Search(searchParams.Query, req)
} else {
return 0, fmt.Errorf("index does not support Search method")
}
searchResp, err := idx.Search(searchParams.Query, req)
if err != nil {
return 0, err
}
@@ -261,13 +334,18 @@ func (m *meilisearchDB) Search(ctx context.Context, searchParams *SearchParams,
return
}
// Insert 插入文档
// Insert 插入文档(自动创建索引)
func (m *meilisearchDB) Insert(ctx context.Context, document interface{}, indexName string) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
// 确保索引存在
if err = m.ensureIndexExists(c, indexName); err != nil {
return 0, err
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
@@ -308,14 +386,7 @@ func (m *meilisearchDB) Insert(ctx context.Context, document interface{}, indexN
// 执行插入
documents := []map[string]interface{}{docMap}
idx := indexInterface(indexName, c)
var task *ms.TaskInfo
if i, ok := idx.(interface {
AddDocuments([]map[string]interface{}, interface{}) (*ms.TaskInfo, error)
}); ok {
task, err = i.AddDocuments(documents, nil)
} else {
return 0, fmt.Errorf("index does not support AddDocuments method")
}
task, err := idx.AddDocuments(documents, nil)
if err != nil {
return 0, err
}
@@ -329,13 +400,18 @@ func (m *meilisearchDB) Insert(ctx context.Context, document interface{}, indexN
return task.TaskUID, nil
}
// InsertMany 批量插入文档
// InsertMany 批量插入文档(自动创建索引)
func (m *meilisearchDB) InsertMany(ctx context.Context, documents []interface{}, indexName string) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
// 确保索引存在
if err = m.ensureIndexExists(c, indexName); err != nil {
return 0, err
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return 0, err
@@ -379,14 +455,7 @@ func (m *meilisearchDB) InsertMany(ctx context.Context, documents []interface{},
// 执行批量插入
idx := indexInterface(indexName, c)
var task *ms.TaskInfo
if i, ok := idx.(interface {
AddDocuments([]map[string]interface{}, interface{}) (*ms.TaskInfo, error)
}); ok {
task, err = i.AddDocuments(docs, nil)
} else {
return 0, fmt.Errorf("index does not support AddDocuments method")
}
task, err := idx.AddDocuments(docs, nil)
if err != nil {
return 0, err
}
@@ -426,14 +495,7 @@ func (m *meilisearchDB) Update(ctx context.Context, document interface{}, indexN
// 执行更新
documents := []map[string]interface{}{docMap}
idx := indexInterface(indexName, c)
var task *ms.TaskInfo
if i, ok := idx.(interface {
UpdateDocuments([]map[string]interface{}, interface{}) (*ms.TaskInfo, error)
}); ok {
task, err = i.UpdateDocuments(documents, nil)
} else {
return 0, fmt.Errorf("index does not support UpdateDocuments method")
}
task, err := idx.UpdateDocuments(documents, nil)
if err != nil {
return 0, err
}
@@ -456,14 +518,7 @@ func (m *meilisearchDB) Delete(ctx context.Context, id string, indexName string)
// 执行删除
idx := indexInterface(indexName, c)
var task *ms.TaskInfo
if i, ok := idx.(interface {
DeleteDocument(string) (*ms.TaskInfo, error)
}); ok {
task, err = i.DeleteDocument(id)
} else {
return 0, fmt.Errorf("index does not support DeleteDocument method")
}
task, err := idx.DeleteDocument(id, nil)
if err != nil {
return 0, err
}
@@ -504,14 +559,7 @@ func (m *meilisearchDB) DeleteSoft(ctx context.Context, id string, indexName str
// 执行更新
documents := []map[string]interface{}{updateMap}
idx := indexInterface(indexName, c)
var task *ms.TaskInfo
if i, ok := idx.(interface {
UpdateDocuments([]map[string]interface{}, interface{}) (*ms.TaskInfo, error)
}); ok {
task, err = i.UpdateDocuments(documents, nil)
} else {
return 0, fmt.Errorf("index does not support UpdateDocuments method")
}
task, err := idx.UpdateDocuments(documents, nil)
if err != nil {
return 0, err
}
@@ -552,13 +600,7 @@ func (m *meilisearchDB) Get(ctx context.Context, id string, indexName string, re
// 执行查询
var doc map[string]interface{}
idx := indexInterface(indexName, c)
if i, ok := idx.(interface {
GetDocument(string, interface{}) error
}); ok {
err = i.GetDocument(id, &doc)
} else {
return fmt.Errorf("index does not support GetDocument method")
}
err = idx.GetDocument(id, nil, &doc)
if err != nil {
return err
}
@@ -600,122 +642,7 @@ func (m *meilisearchDB) cleanCache(ctx context.Context, indexName string, tenant
return nil
}
// CreateIndex 创建索引
func (m *meilisearchDB) CreateIndex(ctx context.Context, indexConfig *IndexConfig) (taskUID int64, err error) {
client, err := m.getClient()
if err != nil {
return 0, err
}
indexSettings := &ms.IndexConfig{
Uid: indexConfig.UID,
PrimaryKey: indexConfig.PrimaryKey,
}
if c, ok := client.(interface {
CreateIndex(*ms.IndexConfig) (*ms.TaskInfo, error)
}); ok {
task, err := c.CreateIndex(indexSettings)
if err != nil {
return 0, err
}
return task.TaskUID, nil
}
return 0, fmt.Errorf("client does not support CreateIndex")
}
// DeleteIndex 删除索引
func (m *meilisearchDB) DeleteIndex(ctx context.Context, indexName string) (err error) {
client, err := m.getClient()
if err != nil {
return err
}
if c, ok := client.(interface{ DeleteIndex(string) error }); ok {
return c.DeleteIndex(indexName)
}
return fmt.Errorf("client does not support DeleteIndex")
}
// GetIndex 获取索引信息
func (m *meilisearchDB) GetIndex(ctx context.Context, indexName string) (interface{}, error) {
client, err := m.getClient()
if err != nil {
return nil, err
}
if c, ok := client.(interface {
GetIndex(string) (interface{}, error)
}); ok {
return c.GetIndex(indexName)
}
return nil, fmt.Errorf("client does not support GetIndex")
}
// GetIndexes 获取所有索引
func (m *meilisearchDB) GetIndexes(ctx context.Context) (interface{}, error) {
client, err := m.getClient()
if err != nil {
return nil, err
}
if c, ok := client.(interface {
GetIndexes(interface{}) (interface{}, error)
}); ok {
return c.GetIndexes(nil)
}
return nil, fmt.Errorf("client does not support GetIndexes")
}
// UpdateSettings 更新索引设置
func (m *meilisearchDB) UpdateSettings(ctx context.Context, indexName string, settings *ms.Settings) (taskUID int64, err error) {
c, err := m.getClient()
if err != nil {
return 0, err
}
idx := indexInterface(indexName, c)
var task *ms.TaskInfo
if i, ok := idx.(interface {
UpdateSettings(*ms.Settings) (*ms.TaskInfo, error)
}); ok {
task, err = i.UpdateSettings(settings)
} else {
return 0, fmt.Errorf("index does not support UpdateSettings method")
}
if err != nil {
return 0, err
}
return task.TaskUID, nil
}
// GetSettings 获取索引设置
func (m *meilisearchDB) GetSettings(ctx context.Context, indexName string) (*ms.Settings, error) {
c, err := m.getClient()
if err != nil {
return nil, err
}
idx := indexInterface(indexName, c)
var settings *ms.Settings
if i, ok := idx.(interface{ GetSettings() (*ms.Settings, error) }); ok {
settings, err = i.GetSettings()
} else {
return nil, fmt.Errorf("index does not support GetSettings method")
}
if err != nil {
return nil, err
}
return settings, nil
}
// GetClient 获取原始客户端(用于高级操作)
func (m *meilisearchDB) GetClient() (interface{ Index(string) interface{} }, error) {
func (m *meilisearchDB) GetClient() (ms.ServiceManager, error) {
return m.getClient()
}
// BuildUpdateData 构建更新数据
func BuildUpdateData(ctx context.Context, req interface{}) (map[string]interface{}, error) {
return gconv.Map(req), nil
}

56
go.mod
View File

@@ -6,15 +6,30 @@ require (
github.com/alibaba/sentinel-golang v1.0.4
github.com/bwmarrin/snowflake v0.3.0
github.com/cloudwego/eino v0.7.26
github.com/cloudwego/eino-ext/components/document/loader/url v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/parser/docx v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/parser/pdf v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/parser/xlsx v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1
github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/indexer/es8 v0.0.0-20260331071634-4f359694d2d9
github.com/cloudwego/eino-ext/components/retriever/es8 v0.0.0-20260331071634-4f359694d2d9
github.com/elastic/go-elasticsearch/v8 v8.16.0
github.com/go-ego/gse v1.0.2
github.com/gogf/gf/contrib/registry/consul/v2 v2.9.5
github.com/gogf/gf/contrib/trace/otlphttp/v2 v2.9.5
github.com/gogf/gf/v2 v2.9.5
github.com/golang/glog v1.2.5
github.com/google/uuid v1.6.0
github.com/meilisearch/meilisearch-go v0.36.1
github.com/minio/minio-go/v7 v7.0.97
github.com/nats-io/nats.go v1.48.0
github.com/olivere/elastic/v7 v7.0.32
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rpcxio/rpcx-consul v0.1.1
github.com/smallnest/rpcx v1.9.1
github.com/tiger1103/gfast-token v1.0.10
go.mongodb.org/mongo-driver/v2 v2.4.0
@@ -24,11 +39,15 @@ require (
require (
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/PuerkitoBio/goquery v1.8.1 // indirect
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/akutz/memconn v0.1.0 // indirect
github.com/alitto/pond v1.9.2 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/andybalholm/cascadia v1.3.1 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
@@ -37,18 +56,25 @@ require (
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cenk/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/clbanning/mxj/v2 v2.7.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/cloudwego/eino-ext/components/document/parser/html v0.0.0-20241224063832-9fbcc0e56c28 // indirect
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14 // indirect
github.com/dgraph-io/badger/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-jump v0.0.0-20211018200510-ba001c3ffce0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dslipak/pdf v0.0.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/edwingeng/doublejump v1.0.1 // indirect
github.com/eino-contrib/docx2md v0.0.1 // indirect
github.com/eino-contrib/jsonschema v1.0.3 // indirect
github.com/elastic/elastic-transport-go/v8 v8.8.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/evanphx/json-patch v0.5.2 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
@@ -58,16 +84,17 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-ping/ping v1.2.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/godzie44/go-uring v0.0.0-20220926161041-69611e8b13d5 // indirect
github.com/gogf/gf/contrib/nosql/redis/v2 v2.9.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.3.1 // indirect
github.com/golang/glog v1.2.5 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/pprof v0.0.0-20250128161936-077ca0a936bf // indirect
github.com/goph/emperror v0.17.2 // indirect
github.com/gorilla/css v1.0.1 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/grandcat/zeroconf v1.0.0 // indirect
github.com/grokify/html-strip-tags-go v0.1.0 // indirect
@@ -81,18 +108,24 @@ require (
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/ratelimit v1.0.2 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/kavu/go_reuseport v1.5.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
github.com/klauspost/reedsolomon v1.12.4 // indirect
github.com/libp2p/go-sockaddr v0.2.0 // indirect
github.com/magiconair/properties v1.8.10 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/meguminnnnnnnnn/go-openai v0.1.1 // indirect
github.com/microcosm-cc/bluemonday v1.0.27 // indirect
github.com/miekg/dns v1.1.63 // indirect
github.com/minio/crc64nvme v1.1.0 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
@@ -100,6 +133,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nikolalohinski/gonja v1.5.3 // indirect
@@ -107,7 +141,7 @@ require (
github.com/olekukonko/ll v0.0.9 // indirect
github.com/olekukonko/tablewriter v1.1.0 // indirect
github.com/onsi/ginkgo/v2 v2.22.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.9 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
@@ -115,8 +149,13 @@ require (
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/quic-go/quic-go v0.49.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/redis/go-redis/v9 v9.12.1 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
github.com/richardlehane/msoleps v1.0.4 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rpcxio/libkv v0.5.1 // indirect
github.com/rs/cors v1.11.1 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect
github.com/shirou/gopsutil/v3 v3.21.6 // indirect
@@ -124,6 +163,7 @@ require (
github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f // indirect
github.com/smallnest/quick v0.2.0 // indirect
github.com/smallnest/rsocket v0.0.0-20241130031020-4a72eb6ff62a // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect
github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b // indirect
github.com/tinylib/msgp v1.3.0 // indirect
@@ -132,13 +172,19 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/vcaesar/cedar v0.30.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/volcengine/volc-sdk-golang v1.0.199 // indirect
github.com/volcengine/volcengine-go-sdk v1.0.181 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xtaci/kcp-go v5.4.20+incompatible // indirect
github.com/xuri/efp v0.0.0-20240408161823-9ad904a10d6d // indirect
github.com/xuri/excelize/v2 v2.9.0 // indirect
github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 // indirect
github.com/yargevad/filepathx v1.0.0 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.opencensus.io v0.23.0 // indirect
@@ -149,9 +195,9 @@ require (
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
go.uber.org/mock v0.5.0 // indirect
golang.org/x/arch v0.11.0 // indirect
golang.org/x/arch v0.15.0 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/exp v0.0.0-20250128144449-3edf0e91c1ae // indirect
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 // indirect
golang.org/x/mod v0.26.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sync v0.16.0 // indirect

566
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,6 @@ import (
_ "gitea.com/red-future/common/consul"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/log/controller"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
@@ -35,7 +34,7 @@ func init() {
}
//s.Use(common.Cors) //中间件验证
//s.EnablePProf() //启用性能分析
Httpserver.BindMiddlewareDefault(validateFilterPathMiddleware)
//Httpserver.BindMiddlewareDefault(validateFilterPathMiddleware)
Httpserver.SetOpenApiPath("/api.json")
Httpserver.SetDumpRouterMap(true) //关闭打印路由注册信息
Httpserver.BindMiddlewareDefault(ghttp.MiddlewareHandlerResponse)
@@ -70,14 +69,10 @@ func SkipMiddleware(h func(r *ghttp.Request), path string) (handler ghttp.Handle
}
func RouteRegister(controllers []interface{}) {
Httpserver.Group("/log", func(group *ghttp.RouterGroup) {
group.Middleware(jaeger.NewTracer)
group.Bind(controller.OperationLog)
})
Httpserver.BindHandler("/uploadConfig", func(r *ghttp.Request) {
config := g.Config().MustGet(r.GetCtx(), "upload")
r.Response.WriteJsonExit(config)
})
//Httpserver.Group("/log", func(group *ghttp.RouterGroup) {
// group.Middleware(jaeger.NewTracer)
// group.Bind(controller.OperationLog)
//})
re := regexp.MustCompile("[A-Z]")
for _, t := range controllers {
sName := reflect.ValueOf(t).Elem().Type().Name()

View File

@@ -9,7 +9,7 @@ import (
"sync/atomic"
"time"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/gogf/gf/v2/frame/g"
@@ -1081,7 +1081,7 @@ func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, sta
}
// 使用common/redis中的Lock方法获取分布式锁
success, err := redis.Lock(ctx, lockKey, lockTimeout, func(ctx context.Context) error {
success, err := utils.Lock(ctx, lockKey, lockTimeout, func(ctx context.Context) error {
// 设置熔断器状态
_, err := redisClient.Do(ctx, "SETEX", stateKey, ttl, state)
if err != nil {
@@ -1335,7 +1335,7 @@ func resetSingleResource(r *ghttp.Request, resourceName string) error {
if redisClient != nil {
lockKey := "circuit_breaker:" + resourceName + ":lock"
// 使用较短的锁超时时间
success, err := redis.Lock(r.GetCtx(), lockKey, int64(3), func(ctx context.Context) error {
success, err := utils.Lock(r.GetCtx(), lockKey, int64(3), func(ctx context.Context) error {
_, err := redisClient.Del(ctx, "circuit_breaker:"+resourceName+":state")
if err != nil {
g.Log().Warningf(ctx, "清除分布式熔断状态失败: %s, error: %v", resourceName, err)

View File

@@ -62,7 +62,7 @@ func IPLimiter(r *ghttp.Request) {
// UserLimiter 用户维度限流中间件(防止单用户滥用)
func UserLimiter(r *ghttp.Request) {
if r.RequestURI == "/swagger" {
if r.RequestURI == "/swagger" || r.RequestURI == "/admin-go/api/v1/pub/captcha/get" || r.RequestURI == "/admin-go/api/v1/system/login" {
r.Middleware.Next()
return
}

View File

@@ -3,15 +3,15 @@ package minio
import (
"context"
"fmt"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
"net/http"
"path/filepath"
"strings"
"time"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/os/glog"
"github.com/google/uuid"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
@@ -19,12 +19,11 @@ import (
// IoConfig 映射 YAML 中的 minio 配置节点
type IoConfig struct {
FilePrefix string `yaml:"filePrefix"` // 文件前缀
Endpoint string `yaml:"endpoint"` // MinIO API 地址
AccessKey string `yaml:"accessKey"` // AK
SecretKey string `yaml:"secretKey"` // SK
Secure bool `yaml:"secure"` // 是否启用 SSL
Region string `yaml:"region"` // 区域
Endpoint string `yaml:"endpoint"` // MinIO API 地址
AccessKey string `yaml:"accessKey"` // AK
SecretKey string `yaml:"secretKey"` // SK
Secure bool `yaml:"secure"` // 是否启用 SSL
Region string `yaml:"region"` // 区域
}
// 全局 MinIO 客户端(初始化一次,避免重复创建)
@@ -37,12 +36,11 @@ func init() {
if !g.Cfg().MustGet(ctx, "minio").IsEmpty() {
// 加载 MinIO 配置(可从配置文件/环境变量读取,这里硬编码示例)
minioCfg = IoConfig{
FilePrefix: g.Cfg().MustGet(ctx, "filePrefix").String(),
Endpoint: g.Cfg().MustGet(ctx, "minio.endpoint").String(),
AccessKey: g.Cfg().MustGet(ctx, "minio.accessKey").String(),
SecretKey: g.Cfg().MustGet(ctx, "minio.secretKey").String(),
Secure: g.Cfg().MustGet(ctx, "minio.secure").Bool(),
Region: g.Cfg().MustGet(ctx, "minio.region").String(),
Endpoint: g.Cfg().MustGet(ctx, "minio.endpoint").String(),
AccessKey: g.Cfg().MustGet(ctx, "minio.accessKey").String(),
SecretKey: g.Cfg().MustGet(ctx, "minio.secretKey").String(),
Secure: g.Cfg().MustGet(ctx, "minio.secure").Bool(),
Region: g.Cfg().MustGet(ctx, "minio.region").String(),
}
// 创建 MinIO 客户端
var err error
@@ -56,11 +54,16 @@ func init() {
}
}
func UploadFile(ctx context.Context, fileHeader *ghttp.UploadFile) (imagesUrl string, err error) {
return uploadFile(ctx, getBucketName(ctx), fileHeader)
func UploadFile(ctx context.Context, fileHeader *ghttp.UploadFile) (imagesUrl string, fileName string, fileFormat string, err error) {
return uploadFile(ctx, fileHeader)
}
func uploadFile(ctx context.Context, bucketName string, fileHeader *ghttp.UploadFile) (imagesUrl string, err error) {
func uploadFile(ctx context.Context, fileHeader *ghttp.UploadFile) (imagesUrl string, fileName string, fileFormat string, err error) {
bucketName, err := utils.GetBucketName(ctx)
if err != nil {
glog.Errorf(ctx, "获取桶名称失败: %v", err)
return
}
// 检查/创建桶
exists, err := minioClient.BucketExists(ctx, bucketName)
if err != nil {
@@ -122,25 +125,5 @@ func uploadFile(ctx context.Context, bucketName string, fileHeader *ghttp.Upload
glog.Errorf(ctx, "上传图片失败: %v", err)
return
}
return objectName, err
}
// GetFileAddressPrefix 拼接图片前缀地址
func GetFileAddressPrefix(ctx context.Context) (imageUrl string) {
// 拼接图片前缀地址
var url = "http://"
if minioCfg.Secure {
url = "https://"
}
imgAddressPrefix := url + minioCfg.FilePrefix + "/" + getBucketName(ctx)
return imgAddressPrefix
}
func getBucketName(ctx context.Context) (bucketName string) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
glog.Errorf(ctx, "获取用户信息失败: %v", err)
return
}
return "tenantid-" + gconv.String(user.TenantId)
return objectName, fileHeader.Filename, strings.ReplaceAll(fileExt, ".", ""), err
}

8
rag/eino/consts.go Normal file
View File

@@ -0,0 +1,8 @@
package eino
const (
providerArk = "ark"
providerOpenai = "openai"
providerQianfan = "qianfan"
providerDashscope = "dashscope"
)

View File

@@ -0,0 +1,51 @@
package eino
import (
"context"
"fmt"
"gitea.com/red-future/common/utils"
"github.com/cloudwego/eino-ext/components/document/loader/url"
"github.com/cloudwego/eino-ext/components/document/parser/docx"
"github.com/cloudwego/eino-ext/components/document/parser/pdf"
"github.com/cloudwego/eino-ext/components/document/parser/xlsx"
"github.com/cloudwego/eino/components/document"
"github.com/cloudwego/eino/components/document/parser"
"github.com/cloudwego/eino/schema"
)
// LoadDocument 业务函数:加载文件
func LoadDocument(ctx context.Context, filePath, fileFormat string) (docs []*schema.Document, err error) {
p, err := docsParser(ctx, fileFormat)
if err != nil {
return
}
loader, err := url.NewLoader(ctx, &url.LoaderConfig{
Parser: p,
})
imageUrl, err := utils.GetFileAddressPrefix(ctx)
if err != nil {
return
}
docs, err = loader.Load(context.Background(), document.Source{
URI: fmt.Sprintf("%s%s", imageUrl, filePath),
})
return
}
func docsParser(ctx context.Context, fileFormat string) (p parser.Parser, err error) {
switch fileFormat {
case "docx":
p, err = docx.NewDocxParser(ctx, &docx.Config{
ToSections: true,
IncludeHeaders: true,
IncludeFooters: true,
IncludeTables: true,
})
case "pdf":
p, err = pdf.NewPDFParser(ctx, &pdf.Config{})
case "xlsx":
p, err = xlsx.NewXlsxParser(ctx, &xlsx.Config{})
}
return
}

View File

@@ -0,0 +1,64 @@
package eino
import (
"context"
"github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive"
"github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic"
"github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/frame/g"
)
// SemanticSplitDocument 语义分割文档
func SemanticSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) {
// 默认分隔符(支持中英文)
separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"}
// 读取配置,使用合理的默认值
bufferSize := g.Cfg().MustGet(ctx, "eino.splitter.bufferSize").Int()
minChunkSize := g.Cfg().MustGet(ctx, "eino.splitter.minChunkSize").Int()
percentile := g.Cfg().MustGet(ctx, "eino.splitter.percentile").Float64()
batchSize := g.Cfg().MustGet(ctx, "eino.splitter.batchSize").Int()
if batchSize <= 0 {
batchSize = 10 // doubao-embedding-vision 限制每批最多 10 个
}
// 使用批量包装器
var batchEmbedder *BatchEmbedder
provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
batchEmbedder = NewBatchEmbedder(EmbedderArk, batchSize)
case providerOpenai:
batchEmbedder = NewBatchEmbedder(EmbedderOpenAI, batchSize)
case providerDashscope:
batchEmbedder = NewBatchEmbedder(EmbedderDashscope, batchSize)
}
splitter, err := semantic.NewSplitter(ctx, &semantic.Config{
Embedding: batchEmbedder,
BufferSize: bufferSize,
MinChunkSize: minChunkSize,
Percentile: percentile,
Separators: separators,
})
if err != nil {
return
}
return splitter.Transform(ctx, docs)
}
// RecursiveSplitDocument 递归分割文档
func RecursiveSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) {
// 默认分隔符(支持中英文)
separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"}
splitter, err := recursive.NewSplitter(ctx, &recursive.Config{
ChunkSize: 512,
OverlapSize: 100,
KeepType: recursive.KeepTypeNone,
Separators: separators,
})
if err != nil {
return
}
return splitter.Transform(ctx, docs)
}

69
rag/eino/embedding.go Normal file
View File

@@ -0,0 +1,69 @@
package eino
import (
"context"
"fmt"
"github.com/cloudwego/eino-ext/components/embedding/ark"
"github.com/cloudwego/eino-ext/components/embedding/dashscope"
"github.com/cloudwego/eino-ext/components/embedding/openai"
"github.com/gogf/gf/v2/frame/g"
"github.com/golang/glog"
)
// 全局只初始化一次
var (
EmbedderArk *ark.Embedder
EmbedderDashscope *dashscope.Embedder
EmbedderOpenAI *openai.Embedder
)
func init() {
ctx := context.Background()
if !g.Cfg().MustGet(ctx, "eino.embedding").IsEmpty() {
var err error
provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
cfg := &ark.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
if apiType := g.Cfg().MustGet(ctx, "eino.embedding.apiType").String(); apiType != "" {
apiTypeVal := ark.APIType(apiType)
cfg.APIType = &apiTypeVal
}
EmbedderArk, err = ark.NewEmbedder(ctx, cfg)
case providerOpenai:
chatModelConfig := &openai.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
EmbedderOpenAI, err = openai.NewEmbedder(ctx, chatModelConfig)
case providerDashscope:
cfg := &dashscope.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
EmbedderDashscope, err = dashscope.NewEmbedder(ctx, cfg)
}
if err != nil {
glog.Fatalf("NewEmbedder of %v error: %v", provider, err)
}
}
return
}
func EmbedStrings(ctx context.Context, texts []string) (embeddings [][]float64, err error) {
provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
return EmbedderArk.EmbedStrings(ctx, texts)
case providerOpenai:
return EmbedderOpenAI.EmbedStrings(ctx, texts)
case providerDashscope:
return EmbedderDashscope.EmbedStrings(ctx, texts)
}
return nil, fmt.Errorf("unsupported provider: %v", provider)
}

View File

@@ -0,0 +1,47 @@
package eino
import (
"context"
"github.com/cloudwego/eino/components/embedding"
)
// BatchEmbedder 包装器,支持批量限制
type BatchEmbedder struct {
embedder embedding.Embedder
batchSize int
}
// NewBatchEmbedder 创建支持批量限制的 embedding 包装器
func NewBatchEmbedder(embedder embedding.Embedder, batchSize int) *BatchEmbedder {
if batchSize <= 0 {
batchSize = 10 // 默认每批 10 个
}
return &BatchEmbedder{
embedder: embedder,
batchSize: batchSize,
}
}
// EmbedStrings 分批调用 embedding
func (b *BatchEmbedder) EmbedStrings(ctx context.Context, texts []string, opts ...embedding.Option) ([][]float64, error) {
if len(texts) <= b.batchSize {
return b.embedder.EmbedStrings(ctx, texts, opts...)
}
var allEmbeddings [][]float64
for i := 0; i < len(texts); i += b.batchSize {
end := i + b.batchSize
if end > len(texts) {
end = len(texts)
}
batch := texts[i:end]
embeddings, err := b.embedder.EmbedStrings(ctx, batch, opts...)
if err != nil {
return nil, err
}
allEmbeddings = append(allEmbeddings, embeddings...)
}
return allEmbeddings, nil
}

114
rag/gse/utils.go Normal file
View File

@@ -0,0 +1,114 @@
package gse
import (
"context"
"sort"
"github.com/go-ego/gse"
"github.com/go-ego/gse/hmm/extracker"
"github.com/go-ego/gse/hmm/segment"
"github.com/gogf/gf/v2/os/glog"
)
var GseTool *gseTool
// 初始化函数:程序启动时执行一次
func init() {
var err error
GseTool, err = newGseTool()
if err != nil {
glog.Error(context.Background(), err)
}
}
// gseTool 关键词提取工具gse v1.0.2 标准)
type gseTool struct {
seg gse.Segmenter
tfidf *extracker.TagExtracter
tr *extracker.TextRanker
}
// newGseTool 初始化工具(内置词典 + 停用词)
func newGseTool() (tool *gseTool, err error) {
// 1. 初始化分词器
var seg gse.Segmenter
// 内置词典(无外部文件)
err = seg.LoadDictEmbed()
if err != nil {
return
}
// 内置停用词v1.0.2 标准)
err = seg.LoadStopEmbed()
if err != nil {
return
}
// 2. 初始化 TF-IDF 提取器
tfidf := &extracker.TagExtracter{}
tfidf.WithGse(seg)
err = tfidf.LoadIdf()
if err != nil {
return
}
// 3. 初始化 TextRank 提取器
tr := &extracker.TextRanker{}
tr.WithGse(seg)
tool = &gseTool{
seg: seg,
tfidf: tfidf,
tr: tr,
}
return
}
// Cut 分词(关键词提取唯一正确模式:精确模式 + HMM
func (k *gseTool) Cut(text string) []string {
return k.seg.Cut(text, true)
}
// Keyword 最终输出:关键词 + 权重
type Keyword struct {
Word string `json:"word"`
Score float64 `json:"score"`
}
func (k *gseTool) Extract(text string, topN int) []Keyword {
// 1. 提取 TF-IDF
tfTags := k.extractTFIDF(text, topN)
// 2. 提取 TextRank
trTags := k.extractTextRank(text, topN)
// 3. 合并成最终关键词(业务最常用)
scoreMap := make(map[string]float64)
for _, tag := range tfTags {
scoreMap[tag.Text] = tag.Weight
}
for _, tag := range trTags {
scoreMap[tag.Text] = tag.Weight
}
// 转成切片并排序(高分在前)
res := make([]Keyword, 0, len(scoreMap))
for word, score := range scoreMap {
res = append(res, Keyword{Word: word, Score: score})
}
sort.Slice(res, func(i, j int) bool {
return res[i].Score > res[j].Score
})
return res
}
// ExtractTFIDF TF-IDF 关键词带权重90% 业务:文章标签、搜索、关键词
func (k *gseTool) extractTFIDF(text string, topN int) segment.Segments {
return k.tfidf.ExtractTags(text, topN)
}
// ExtractTextRank TextRank 关键词(带权重)长文本、摘要、语义理解
func (k *gseTool) extractTextRank(text string, topN int) segment.Segments {
return k.tr.TextRank(text, topN)
}

View File

@@ -2,7 +2,6 @@ package redis
import (
"context"
"errors"
"strings"
"sync"
"time"
@@ -38,40 +37,6 @@ func RedisClient() *gredis.Redis {
return getClient()
}
// Lock 分布式锁
func Lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
limit := 3
LOOP:
if limit < 0 {
return false, errors.New("锁重试次数耗尽")
}
limit--
client := getClient()
if val, err := client.Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: &expireSeconds,
},
NX: true,
}); err != nil {
return false, err
} else {
if val.Bool() {
defer func(client *gredis.Redis, ctx context.Context, key string) {
if _, err = client.Del(ctx, key); err != nil {
glog.Errorf(ctx, "redis client Del error: %v", err)
}
}(client, ctx, key)
if err = fn(ctx); err != nil {
return false, err
}
return true, nil
} else {
time.Sleep(time.Second)
goto LOOP
}
}
}
func GetReadStream(ctx context.Context, msg ...QueueMessage) error {
for _, t := range msg {
err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.AutoAck, t.HandleFunc)

69
rpc/client.go Normal file
View File

@@ -0,0 +1,69 @@
package rpc
import (
"context"
"sync"
"github.com/gogf/gf/v2/frame/g"
consulClient "github.com/rpcxio/rpcx-consul/client"
"github.com/smallnest/rpcx/client"
)
var (
xclientMap = make(map[string]client.XClient)
xclientMu sync.RWMutex
)
// CallWithConsul 基于Consul服务发现调用RPC
func CallWithConsul(ctx context.Context, serviceName string, args, reply interface{}) error {
xclient, err := getOrCreatXClient(serviceName)
if err != nil {
return err
}
err = xclient.Call(ctx, "Mul", args, reply)
if err != nil {
// 调用失败,清理失效客户端
removeXClient(serviceName)
}
return err
}
func removeXClient(serviceName string) {
xclientMu.Lock()
defer xclientMu.Unlock()
if c, ok := xclientMap[serviceName]; ok {
c.Close()
delete(xclientMap, serviceName)
}
}
func getOrCreatXClient(serviceName string) (client.XClient, error) {
// 第一次:读锁,快速判断是否存在
xclientMu.RLock()
if c, ok := xclientMap[serviceName]; ok {
xclientMu.RUnlock()
return c, nil
}
xclientMu.RUnlock()
// 没找到,加写锁准备创建
xclientMu.Lock()
defer xclientMu.Unlock()
// 第二次:双重检查,防止刚被别人创建完
if c, ok := xclientMap[serviceName]; ok {
return c, nil
}
consulAddr := g.Cfg().MustGet(nil, "consul.address").String()
d, err := consulClient.NewConsulDiscovery("rpcx", serviceName, []string{consulAddr}, nil)
if err != nil {
return nil, err
}
c := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
xclientMap[serviceName] = c
return c, nil
}

View File

@@ -1,364 +1,347 @@
package rpc
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"
"gitea.com/red-future/common/consul"
"gitea.com/red-future/common/jaeger"
"github.com/gogf/gf/v2/frame/g"
rpcxClient "github.com/smallnest/rpcx/client"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
var (
// pluginsContainer rpcx插件容器全局统一设置
// init()中添加链路追踪插件所有client共用此容器
pluginsContainer = rpcxClient.NewPluginContainer()
// clientPool 连接池缓存key为服务名value为客户端实例
clientPool = make(map[string]*rpcxClient.OneClient)
// poolMutex 连接池锁
poolMutex sync.RWMutex
// healthCheckInterval 健康检查间隔(秒)
healthCheckInterval = 30
// lastHealthCheckTime 上次健康检查时间key为服务名
lastHealthCheckTime = make(map[string]time.Time)
// serviceAddrCache 服务地址缓存key为服务名value为地址
serviceAddrCache = make(map[string]string)
)
func init() {
// 全局设置链路追踪插件所有client共用
pluginsContainer.Add(&TracingPlugin{})
// 启动后台健康检查协程
go healthCheckLoop()
}
// healthCheckLoop 后台健康检查循环
func healthCheckLoop() {
ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
checkAllConnections()
}
}
// checkAllConnections 检查所有缓存连接的健康状态
func checkAllConnections() {
poolMutex.Lock()
defer poolMutex.Unlock()
now := time.Now()
for serviceName, client := range clientPool {
// 检查连接是否需要健康检查
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
continue
}
}
ctx := context.Background()
// 检查连接健康状态(心跳检测)
if !isClientHealthy(ctx, client, serviceName) {
g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
delete(serviceAddrCache, serviceName)
continue
}
// 连接健康,检查服务地址是否发生变化
currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
if err != nil {
g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v保持现有连接", serviceName, err)
lastHealthCheckTime[serviceName] = now
continue
}
// 检查地址是否发生变化
if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s重建连接", serviceName, oldAddr, currentAddr)
// 关闭旧连接并从连接池移除,下次请求时会创建新连接
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
// 更新缓存的新地址
serviceAddrCache[serviceName] = currentAddr
} else {
// 地址未变化,更新检查时间
if !ok {
serviceAddrCache[serviceName] = currentAddr
}
g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
}
lastHealthCheckTime[serviceName] = now
}
}
// isClientHealthy 检查client是否健康
// 使用心跳检测方式:尝试调用服务的心跳方法
func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
if client == nil {
return false
}
// 设置较短的超时时间,避免阻塞
pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
// 尝试调用健康检查方法
// 大多数服务都会提供 Ping 或 Health 方法
// 如果服务没有提供这些方法,会返回错误,我们认为是健康的
// 因为连接本身是正常的,只是方法不存在
var reply interface{}
err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
// 如果调用成功,连接肯定健康
if err == nil {
return true
}
// 如果是方法不存在的错误说明连接是健康的只是服务没有Ping方法
// 这种情况下我们认为是健康的
if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
return true
}
// 其他错误(网络错误、超时等)说明连接不健康
g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
return false
}
// isMethodNotFoundError 判断是否是方法未找到错误
func isMethodNotFoundError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
// rpcx 方法不存在的常见错误信息
return strings.Contains(errStr, "not found") ||
strings.Contains(errStr, "no such") ||
strings.Contains(errStr, "service not found") ||
strings.Contains(errStr, "method not found")
}
// isServiceNotFoundError 判断是否是服务未找到错误
func isServiceNotFoundError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "no service") ||
strings.Contains(errStr, "service not registered")
}
// getOrCreateClient 从连接池获取或创建客户端(带连接池)
func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
if g.IsEmpty(serviceName) {
return nil, errors.New("服务名称不能为空")
}
// 先尝试从连接池获取
poolMutex.RLock()
client, exists := clientPool[serviceName]
poolMutex.RUnlock()
// 如果存在且健康,直接返回
if exists && isClientHealthy(ctx, client, serviceName) {
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
return client, nil
}
// 不存在或不健康,重新创建
poolMutex.Lock()
defer poolMutex.Unlock()
// 双重检查,防止并发时重复创建
if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
return client, nil
}
// 获取服务实例地址
addr, err := consul.GetInstanceAddr(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
return nil, err
}
g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
// 缓存服务地址,用于健康检查时对比
serviceAddrCache[serviceName] = addr
// 创建服务发现
discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
if err != nil {
g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
return nil, err
}
// 创建新客户端
newClient := rpcxClient.NewOneClient(
rpcxClient.Failtry,
rpcxClient.RandomSelect,
discovery,
rpcxClient.DefaultOption,
)
newClient.SetPlugins(pluginsContainer)
// 更新连接池
if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
oldClient.Close()
}
clientPool[serviceName] = newClient
lastHealthCheckTime[serviceName] = time.Now()
g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
return newClient, nil
}
// Call 调用rpcx服务方法
// serviceName: 服务名称
// serviceMethod: 服务方法
// args: 请求参数
// reply: 响应结果
func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
// 从连接池获取客户端(不再关闭连接)
client, err := getOrCreateClient(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
return err
}
// 设置超时
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// 调用服务方法
err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
if err != nil {
g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
// 如果调用失败,检查连接是否需要重新创建
poolMutex.Lock()
if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
// 标记为不健康,下次请求时会重新创建
delete(lastHealthCheckTime, serviceName)
}
poolMutex.Unlock()
return err
}
return nil
}
// Close 关闭指定服务的连接(用于清理连接池)
func Close(serviceName string) {
poolMutex.Lock()
defer poolMutex.Unlock()
if client, ok := clientPool[serviceName]; ok {
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
delete(serviceAddrCache, serviceName)
g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
}
}
// CloseAll 关闭所有连接(用于优雅停机)
func CloseAll() {
poolMutex.Lock()
defer poolMutex.Unlock()
for serviceName, client := range clientPool {
client.Close()
g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
}
clientPool = make(map[string]*rpcxClient.OneClient)
lastHealthCheckTime = make(map[string]time.Time)
serviceAddrCache = make(map[string]string)
}
// TracingPlugin rpcx链路追踪插件
// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
type TracingPlugin struct{}
// PreCall 调用前拦截 - 创建jaeger span
func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
// 创建span名称格式: ServiceName.Method
spanName := serviceName + "." + serviceMethod
ctx, span := jaeger.NewSpan(ctx, spanName)
// 记录服务和方法信息
span.SetAttributes(
attribute.String("rpc.service", serviceName),
attribute.String("rpc.method", serviceMethod),
attribute.String("rpc.system", "rpcx"),
)
var data []byte
// 记录请求参数序列化为JSON
if args != nil {
if data, err = json.Marshal(args); err == nil {
argsStr := string(data)
// 限制长度,避免过大
if len(argsStr) > 2000 {
argsStr = argsStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.request", argsStr))
}
}
g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
return
}
// PostCall 调用后拦截 - 记录结果和错误
func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
span := trace.SpanFromContext(ctx)
if span != nil && span.IsRecording() {
defer span.End()
// 记录响应结果
if reply != nil {
if data, err := json.Marshal(reply); err == nil {
replyStr := string(data)
// 限制长度,避免过大
if len(replyStr) > 2000 {
replyStr = replyStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.response", replyStr))
}
}
// 处理错误
if err != nil {
jaeger.RecordError(ctx, err, "rpcx调用失败")
span.SetStatus(codes.Error, err.Error())
g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
} else {
g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
}
}
return nil
}
//var (
// // pluginsContainer rpcx插件容器全局统一设置
// // init()中添加链路追踪插件所有client共用此容器
// pluginsContainer = rpcxClient.NewPluginContainer()
//
// // clientPool 连接池缓存key为服务名value为客户端实例
// clientPool = make(map[string]*rpcxClient.OneClient)
//
// // poolMutex 连接池锁
// poolMutex sync.RWMutex
//
// // healthCheckInterval 健康检查间隔(秒)
// healthCheckInterval = 30
//
// // lastHealthCheckTime 上次健康检查时间key为服务名
// lastHealthCheckTime = make(map[string]time.Time)
//
// // serviceAddrCache 服务地址缓存key为服务名value为地址
// serviceAddrCache = make(map[string]string)
//)
//
//func init() {
// // 全局设置链路追踪插件所有client共用
// pluginsContainer.Add(&TracingPlugin{})
//
// // 启动后台健康检查协程
// go healthCheckLoop()
//}
//
//// healthCheckLoop 后台健康检查循环
//func healthCheckLoop() {
// ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
// defer ticker.Stop()
//
// for range ticker.C {
// checkAllConnections()
// }
//}
//
//// checkAllConnections 检查所有缓存连接的健康状态
//func checkAllConnections() {
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// now := time.Now()
// for serviceName, client := range clientPool {
// // 检查连接是否需要健康检查
// if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
// if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
// continue
// }
// }
//
// ctx := context.Background()
//
// // 检查连接健康状态(心跳检测)
// if !isClientHealthy(ctx, client, serviceName) {
// g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
// client.Close()
// delete(clientPool, serviceName)
// delete(lastHealthCheckTime, serviceName)
// delete(serviceAddrCache, serviceName)
// continue
// }
//
// // 连接健康,检查服务地址是否发生变化
// currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
// if err != nil {
// g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v保持现有连接", serviceName, err)
// lastHealthCheckTime[serviceName] = now
// continue
// }
//
// // 检查地址是否发生变化
// if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
// g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s重建连接", serviceName, oldAddr, currentAddr)
// // 关闭旧连接并从连接池移除,下次请求时会创建新连接
// client.Close()
// delete(clientPool, serviceName)
// delete(lastHealthCheckTime, serviceName)
// // 更新缓存的新地址
// serviceAddrCache[serviceName] = currentAddr
// } else {
// // 地址未变化,更新检查时间
// if !ok {
// serviceAddrCache[serviceName] = currentAddr
// }
// g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
// }
//
// lastHealthCheckTime[serviceName] = now
// }
//}
//
//// isClientHealthy 检查client是否健康
//// 使用心跳检测方式:尝试调用服务的心跳方法
//func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
// if client == nil {
// return false
// }
//
// // 设置较短的超时时间,避免阻塞
// pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
// defer cancel()
//
// // 尝试调用健康检查方法
// // 大多数服务都会提供 Ping 或 Health 方法
// // 如果服务没有提供这些方法,会返回错误,我们认为是健康的
// // 因为连接本身是正常的,只是方法不存在
// var reply interface{}
// err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
//
// // 如果调用成功,连接肯定健康
// if err == nil {
// return true
// }
//
// // 如果是方法不存在的错误说明连接是健康的只是服务没有Ping方法
// // 这种情况下我们认为是健康的
// if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
// return true
// }
//
// // 其他错误(网络错误、超时等)说明连接不健康
// g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
// return false
//}
//
//// isMethodNotFoundError 判断是否是方法未找到错误
//func isMethodNotFoundError(err error) bool {
// if err == nil {
// return false
// }
// errStr := err.Error()
// // rpcx 方法不存在的常见错误信息
// return strings.Contains(errStr, "not found") ||
// strings.Contains(errStr, "no such") ||
// strings.Contains(errStr, "service not found") ||
// strings.Contains(errStr, "method not found")
//}
//
//// isServiceNotFoundError 判断是否是服务未找到错误
//func isServiceNotFoundError(err error) bool {
// if err == nil {
// return false
// }
// errStr := err.Error()
// return strings.Contains(errStr, "no service") ||
// strings.Contains(errStr, "service not registered")
//}
//
//// getOrCreateClient 从连接池获取或创建客户端(带连接池)
//func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
// if g.IsEmpty(serviceName) {
// return nil, errors.New("服务名称不能为空")
// }
//
// // 先尝试从连接池获取
// poolMutex.RLock()
// client, exists := clientPool[serviceName]
// poolMutex.RUnlock()
//
// // 如果存在且健康,直接返回
// if exists && isClientHealthy(ctx, client, serviceName) {
// g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
// return client, nil
// }
//
// // 不存在或不健康,重新创建
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// // 双重检查,防止并发时重复创建
// if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
// return client, nil
// }
//
// // 获取服务实例地址
// addr, err := consul.GetInstanceAddr(ctx, serviceName)
// if err != nil {
// g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
// return nil, err
// }
//
// g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
//
// // 缓存服务地址,用于健康检查时对比
// serviceAddrCache[serviceName] = addr
//
// // 创建服务发现
// discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
// if err != nil {
// g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
// return nil, err
// }
//
// // 创建新客户端
// newClient := rpcxClient.NewOneClient(
// rpcxClient.Failtry,
// rpcxClient.RandomSelect,
// discovery,
// rpcxClient.DefaultOption,
// )
// newClient.SetPlugins(pluginsContainer)
//
// // 更新连接池
// if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
// oldClient.Close()
// }
// clientPool[serviceName] = newClient
// lastHealthCheckTime[serviceName] = time.Now()
//
// g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
//
// return newClient, nil
//}
//
//// Call 调用rpcx服务方法
//// serviceName: 服务名称
//// serviceMethod: 服务方法
//// args: 请求参数
//// reply: 响应结果
//func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
// // 从连接池获取客户端(不再关闭连接)
// client, err := getOrCreateClient(ctx, serviceName)
// if err != nil {
// g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
// return err
// }
//
// // 设置超时
// callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
// defer cancel()
//
// // 调用服务方法
// err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
// if err != nil {
// g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
//
// // 如果调用失败,检查连接是否需要重新创建
// poolMutex.Lock()
// if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
// // 标记为不健康,下次请求时会重新创建
// delete(lastHealthCheckTime, serviceName)
// }
// poolMutex.Unlock()
//
// return err
// }
//
// return nil
//}
//
//// Close 关闭指定服务的连接(用于清理连接池)
//func Close(serviceName string) {
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// if client, ok := clientPool[serviceName]; ok {
// client.Close()
// delete(clientPool, serviceName)
// delete(lastHealthCheckTime, serviceName)
// delete(serviceAddrCache, serviceName)
// g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
// }
//}
//
//// CloseAll 关闭所有连接(用于优雅停机)
//func CloseAll() {
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// for serviceName, client := range clientPool {
// client.Close()
// g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
// }
// clientPool = make(map[string]*rpcxClient.OneClient)
// lastHealthCheckTime = make(map[string]time.Time)
// serviceAddrCache = make(map[string]string)
//}
//
//// TracingPlugin rpcx链路追踪插件
//// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
//type TracingPlugin struct{}
//
//// PreCall 调用前拦截 - 创建jaeger span
//func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
// // 创建span名称格式: ServiceName.Method
// spanName := serviceName + "." + serviceMethod
// ctx, span := jaeger.NewSpan(ctx, spanName)
//
// // 记录服务和方法信息
// span.SetAttributes(
// attribute.String("rpc.service", serviceName),
// attribute.String("rpc.method", serviceMethod),
// attribute.String("rpc.system", "rpcx"),
// )
// var data []byte
// // 记录请求参数序列化为JSON
// if args != nil {
// if data, err = json.Marshal(args); err == nil {
// argsStr := string(data)
// // 限制长度,避免过大
// if len(argsStr) > 2000 {
// argsStr = argsStr[:2000] + "... (truncated)"
// }
// span.SetAttributes(attribute.String("rpc.request", argsStr))
// }
// }
//
// g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
//
// return
//}
//
//// PostCall 调用后拦截 - 记录结果和错误
//func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
// span := trace.SpanFromContext(ctx)
// if span != nil && span.IsRecording() {
// defer span.End()
//
// // 记录响应结果
// if reply != nil {
// if data, err := json.Marshal(reply); err == nil {
// replyStr := string(data)
// // 限制长度,避免过大
// if len(replyStr) > 2000 {
// replyStr = replyStr[:2000] + "... (truncated)"
// }
// span.SetAttributes(attribute.String("rpc.response", replyStr))
// }
// }
//
// // 处理错误
// if err != nil {
// jaeger.RecordError(ctx, err, "rpcx调用失败")
// span.SetStatus(codes.Error, err.Error())
// g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
// } else {
// g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
// }
// }
//
// return nil
//}

93
rpc/server.go Normal file
View File

@@ -0,0 +1,93 @@
package rpc
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/rpcxio/rpcx-consul/serverplugin"
"github.com/smallnest/rpcx/server"
)
var rpcServer *server.Server
// Serve 启动RPC服务
func Serve(ctx context.Context, service interface{}) error {
// 获取本机IP
ip, err := getLocalIP()
if err != nil {
return fmt.Errorf("获取IP失败: %w", err)
}
// 解析端口
addrConfig := g.Cfg().MustGet(ctx, "server.address").String()
portStr := strings.TrimPrefix(addrConfig, ":")
port, err := strconv.Atoi(portStr)
if err != nil {
return errors.New("端口解析失败")
}
serviceAddr := fmt.Sprintf("%s:%d", ip, port)
// 创建服务端
rpcServer = server.NewServer()
// 添加 Consul 注册插件
consulAddr := g.Cfg().MustGet(ctx, "consul.address").String()
if consulAddr != "" {
plugin := &serverplugin.ConsulRegisterPlugin{
ServiceAddress: "tcp@" + serviceAddr,
ConsulServers: []string{consulAddr},
BasePath: "rpcx",
UpdateInterval: time.Minute,
}
if err := plugin.Start(); err != nil {
return err
}
rpcServer.Plugins.Add(plugin)
g.Log().Infof(ctx, "Consul注册成功: %s", serviceAddr)
}
// 注册服务
err = rpcServer.Register(service, "")
if err != nil {
g.Log().Errorf(ctx, "注册服务失败: %v", err)
return nil
}
// 优雅关闭
//gproc.AddShutdownFunc(func(ctx context.Context) {
// if rpcServer != nil {
// _ = rpcServer.Shutdown(ctx)
// }
//})
// 异步启动
go func() {
g.Log().Infof(ctx, "RPC服务启动: %s", serviceAddr)
if err := rpcServer.Serve("tcp", serviceAddr); err != nil {
g.Log().Fatalf(ctx, "RPC服务启动失败: %v", err)
}
}()
return nil
}
func getLocalIP() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String(), nil
}
}
}
return "", fmt.Errorf("未找到本机IP")
}

View File

@@ -3,6 +3,7 @@ package utils
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"reflect"
@@ -13,10 +14,12 @@ import (
"time"
"gitea.com/red-future/common/beans"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"github.com/tiger1103/gfast-token/gftoken"
@@ -385,3 +388,58 @@ func intPow10(n int) int {
}
return result
}
// GetFileAddressPrefix 拼接图片前缀地址
func GetFileAddressPrefix(ctx context.Context) (imageUrl string, err error) {
// 拼接图片前缀地址
bucketName, err := GetBucketName(ctx)
if err != nil {
return
}
imageUrl = fmt.Sprintf("%s/%s", g.Cfg().MustGet(ctx, "filePrefix").String(), bucketName)
return
}
// GetBucketName 获取bucket名称
func GetBucketName(ctx context.Context) (bucketName string, err error) {
user, err := GetUserInfo(ctx)
if err != nil {
return
}
bucketName = fmt.Sprintf("tenantid-%d", user.TenantId)
return
}
// Lock 分布式锁
func Lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
limit := 3
LOOP:
if limit < 0 {
return false, errors.New("锁重试次数耗尽")
}
limit--
var val *gvar.Var
if val, err = g.Redis().Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: &expireSeconds,
},
NX: true,
}); err != nil {
return false, err
}
if val.Bool() {
defer func(ctx context.Context, key string) {
if _, err = g.Redis().Del(ctx, key); err != nil {
glog.Errorf(ctx, "redis client Del error: %v", err)
}
}(ctx, key)
if err = fn(ctx); err != nil {
return false, err
}
return true, nil
}
time.Sleep(time.Second)
goto LOOP
}