9 Commits

Author SHA1 Message Date
qhd
62a09c0e49 feat: 去掉跨域,在网关统一做跨域处理 2026-04-02 16:37:32 +08:00
qhd
0c2e36f607 feat: gfdb增加noTenantId 2026-04-02 10:37:31 +08:00
b1c62b3f16 mongo初始化控制台打印优化 2026-04-02 09:12:59 +08:00
81d8b70f0d Dockerfile 2026-04-01 16:18:04 +08:00
41b2a37fc0 Dockerfile 2026-04-01 14:19:50 +08:00
0ad6bc9438 Dockerfile 2026-04-01 14:11:07 +08:00
qhd
7d2dec86b4 feat: 添加基于Consul的RPC客户端和服务端 2026-04-01 13:39:32 +08:00
qhd
78969f5e6a Merge branch 'dev' of http://116.204.74.41:3000/red-future/common into dev 2026-04-01 13:38:57 +08:00
qhd
2e4a0a89f1 feat: 支持多模型提供商 embedding 2026-04-01 13:38:33 +08:00
15 changed files with 915 additions and 485 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"net/http"
"sync" "sync"
"time" "time"
@@ -20,6 +21,8 @@ var (
consulAddr string consulAddr string
reconnectMutex sync.RWMutex reconnectMutex sync.RWMutex
reconnectDone chan struct{} reconnectDone chan struct{}
connected bool
httpClient *http.Client
) )
// connectConsul 连接 Consul // connectConsul 连接 Consul
@@ -27,6 +30,11 @@ func connectConsul(ctx context.Context) error {
reconnectMutex.Lock() reconnectMutex.Lock()
defer reconnectMutex.Unlock() defer reconnectMutex.Unlock()
// 如果已经连接,不再重复连接
if connected && registry != nil {
return nil
}
var err error var err error
registry, err = consul.New(consul.WithAddress(consulAddr)) registry, err = consul.New(consul.WithAddress(consulAddr))
if err != nil { if err != nil {
@@ -36,11 +44,9 @@ func connectConsul(ctx context.Context) error {
gsvc.SetRegistry(registry) gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin()) gsel.SetBuilder(gsel.NewBuilderRoundRobin())
connected = true
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr) g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
// 启动健康检查和自动重连
go startHealthCheckAndReconnect()
return nil return nil
} }
@@ -51,11 +57,16 @@ func startHealthCheckAndReconnect() {
} }
reconnectDone = make(chan struct{}) reconnectDone = make(chan struct{})
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop() defer ticker.Stop()
ctx := context.Background() ctx := context.Background()
// 初始化HTTP客户端用于健康检查
httpClient = &http.Client{
Timeout: 5 * time.Second,
}
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@@ -65,8 +76,15 @@ func startHealthCheckAndReconnect() {
} }
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...") g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
// 重置连接状态并重连
reconnectMutex.Lock()
connected = false
registry = nil
reconnectMutex.Unlock()
if err := connectConsul(ctx); err != nil { if err := connectConsul(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,5秒后重试...", err) g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,30秒后重试...", err)
} }
case <-reconnectDone: case <-reconnectDone:
@@ -81,18 +99,26 @@ func checkConsulHealth(ctx context.Context) bool {
reconnectMutex.RLock() reconnectMutex.RLock()
defer reconnectMutex.RUnlock() defer reconnectMutex.RUnlock()
if registry == nil { if registry == nil || !connected {
return false return false
} }
// 尝试获取服务列表来检测连接是否正常 // 使用consul原生API进行健康检查
services, err := registry.Search(ctx, gsvc.SearchInput{}) // 调用 /v1/agent/self 接口检测连接状态
url := fmt.Sprintf("http://%s/v1/agent/self", consulAddr)
resp, err := httpClient.Get(url)
if err != nil { if err != nil {
g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err) g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err)
return false return false
} }
defer resp.Body.Close()
g.Log().Debugf(ctx, "✅ Consul 健康检查通过,发现 %d 个服务", len(services)) if resp.StatusCode != http.StatusOK {
g.Log().Debugf(ctx, "Consul 健康检查失败,状态码: %d", resp.StatusCode)
return false
}
//g.Log().Debugf(ctx, "✅ Consul 健康检查通过")
return true return true
} }
@@ -102,8 +128,12 @@ func init() {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化") g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
return return
} }
if err := connectConsul(context.Background()); err != nil { if err := connectConsul(context.Background()); err != nil {
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err) g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
} else {
// 连接成功后启动健康检查和自动重连
go startHealthCheckAndReconnect()
} }
} }
func getLocalIP() (string, error) { func getLocalIP() (string, error) {

View File

@@ -9,6 +9,7 @@ import (
"strings" "strings"
"time" "time"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/utils" "gitea.com/red-future/common/utils"
"github.com/bwmarrin/snowflake" "github.com/bwmarrin/snowflake"
"github.com/gogf/gf/v2/crypto/gmd5" "github.com/gogf/gf/v2/crypto/gmd5"
@@ -280,11 +281,48 @@ func deleteHook(ctx context.Context, in *gdb.HookDeleteInput) (result sql.Result
// ==================== Select钩子缓存读取 ==================== // ==================== Select钩子缓存读取 ====================
func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) { func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
traceID := getTraceID(ctx) var tenantId uint64
// ===================== 最终版安全追加租户ID =====================
tenantEnabled, err := gcache.Get(ctx, getTraceID(ctx, noTenantIdKeyPrefix))
if err != nil {
return
}
if !gconv.Bool(tenantEnabled) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
tenantId = user.TenantId
// 【关键修复】找到 SQL 中第一个出现的 ORDER BY / GROUP BY / LIMIT 等关键字位置
sql := in.Sql
insertPos := len(sql)
keywords := []string{" ORDER BY ", " GROUP BY ", " HAVING ", " LIMIT ", " FOR UPDATE"}
for _, kw := range keywords {
if idx := gstr.PosI(sql, kw); idx != -1 {
insertPos = idx
break
}
}
enabled, err := gcache.Get(ctx, traceID) // 【正确拼接】把条件插入到关键字之前,而不是直接拼在最后
condition := " " + beans.DefSQLBaseCol.TenantId + " = ?"
if gstr.Contains(gstr.ToUpper(sql), " WHERE ") {
// 有 WHERE → 加 AND
in.Sql = sql[:insertPos] + " AND" + condition + sql[insertPos:]
} else {
// 无 WHERE → 加 WHERE
in.Sql = sql[:insertPos] + " WHERE" + condition + sql[insertPos:]
}
in.Args = append(in.Args, tenantId)
}
// ==================================================================
cacheEnabled, err := gcache.Get(ctx, getTraceID(ctx, cacheKeyPrefix))
if err != nil {
return
}
// 未启用缓存,直接执行查询 // 未启用缓存,直接执行查询
if !gconv.Bool(enabled) { if !gconv.Bool(cacheEnabled) {
return in.Next(ctx) return in.Next(ctx)
} }
@@ -303,18 +341,13 @@ func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result
} }
} }
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
encrypt, err := gmd5.Encrypt(fmt.Sprintf("%s:%s", whereCondition, in.Args)) encrypt, err := gmd5.Encrypt(fmt.Sprintf("%s:%s", whereCondition, in.Args))
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 构建缓存keysql:tenantId:table:where条件:args // 构建缓存keysql:tenantId:table:where条件:args
cacheKey := fmt.Sprintf("%s:%s:%s", getCacheKey(user.TenantId, in.Table, false), getSelectTypeString(in.SelectType), encrypt) cacheKey := fmt.Sprintf("%s:%s:%s", getCacheKey(tenantId, in.Table, false), getSelectTypeString(in.SelectType), encrypt)
// 1. 先查缓存 // 1. 先查缓存
if data, ok := getFromCache(ctx, cacheKey); ok { if data, ok := getFromCache(ctx, cacheKey); ok {
@@ -343,7 +376,12 @@ func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result
} }
func getCacheKey(tenantId uint64, table string, isBlur bool) string { func getCacheKey(tenantId uint64, table string, isBlur bool) string {
cacheKey := fmt.Sprintf("sql:tenantId-%v:%s", tenantId, table) var cacheKey string
if g.IsEmpty(tenantId) {
cacheKey = fmt.Sprintf("sql:%s", table)
} else {
cacheKey = fmt.Sprintf("sql:tenantId-%v:%s", tenantId, table)
}
if isBlur { if isBlur {
cacheKey = fmt.Sprintf("%s:*", cacheKey) cacheKey = fmt.Sprintf("%s:*", cacheKey)
} }
@@ -369,10 +407,13 @@ func getSelectTypeString(selectType gdb.SelectType) string {
// ==================== 调用方法 ==================== // ==================== 调用方法 ====================
var ( var (
schemaPrefix = "tenant-" schemaPrefix = "tenant-"
cacheKeyPrefix = "cache-"
noTenantIdKeyPrefix = "tenantId-"
) )
type Gfdb interface { type Gfdb interface {
Exec(ctx context.Context, sql string, args ...any) (sql.Result, error)
Model(ctx context.Context, tableNameOrStruct ...any) *model Model(ctx context.Context, tableNameOrStruct ...any) *model
Transaction(ctx context.Context, f func(ctx context.Context, tx gdb.TX) error) error Transaction(ctx context.Context, f func(ctx context.Context, tx gdb.TX) error) error
} }
@@ -381,25 +422,43 @@ type cache interface {
Cache(ctx context.Context) *gdb.Model Cache(ctx context.Context) *gdb.Model
} }
type model struct { type noTenantId interface {
*gdb.Model NoTenantId(ctx context.Context) *modelCache
} }
type dataBase struct { type dataBase struct {
gdb.DB gdb.DB
} }
func DB(ctx context.Context) Gfdb { type model struct {
var dbName []string *gdb.Model
}
type modelCache struct {
*model
}
func checkSchemaConfig(ctx context.Context) (uint64, bool) {
user, err := utils.GetUserInfo(ctx) user, err := utils.GetUserInfo(ctx)
if err != nil { if err != nil {
glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err) glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err)
return nil return 0, false
} }
var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId) var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId)
sprintf := fmt.Sprintf("database.%s", schema) sprintf := fmt.Sprintf("database.%s", schema)
if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() { if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() {
return user.TenantId, true
}
return user.TenantId, false
}
func DB(ctx context.Context) Gfdb {
tenantId, config := checkSchemaConfig(ctx)
var schema = fmt.Sprintf("%s%v", schemaPrefix, tenantId)
var dbName []string
if config {
dbName = append(dbName, schema) dbName = append(dbName, schema)
} else { } else {
dbName = append(dbName, "default") dbName = append(dbName, "default")
@@ -418,26 +477,21 @@ func DB(ctx context.Context) Gfdb {
} }
func (d *dataBase) Model(ctx context.Context, tableNameOrStruct ...any) *model { func (d *dataBase) Model(ctx context.Context, tableNameOrStruct ...any) *model {
user, err := utils.GetUserInfo(ctx)
if err != nil {
glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err)
return nil
}
m := d.DB.Model(tableNameOrStruct...).Ctx(ctx) m := d.DB.Model(tableNameOrStruct...).Ctx(ctx)
var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId) tenantId, config := checkSchemaConfig(ctx)
sprintf := fmt.Sprintf("database.%s", schema)
if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() { if config {
// 创建按地区分库的配置 // 创建按地区分库的配置
shardingConfig := gdb.ShardingConfig{ shardingConfig := gdb.ShardingConfig{
Schema: gdb.ShardingSchemaConfig{ Schema: gdb.ShardingSchemaConfig{
Enable: true, // 启用分库 Enable: true, // 启用分库
Prefix: schemaPrefix, // 分库前缀 Prefix: schemaPrefix, // 分库前缀
Rule: &RegionShardingRule{RegionMapping: user.TenantId}, // 自定义分库规则 Rule: &RegionShardingRule{RegionMapping: tenantId}, // 自定义分库规则
}, },
} }
m.Sharding(shardingConfig).ShardingValue(user.TenantId) m.Sharding(shardingConfig).ShardingValue(tenantId)
} }
m.OmitNil().Hook(catchSQLHook()) m.OmitNil().Hook(catchSQLHook())
@@ -451,7 +505,7 @@ func (d *dataBase) Transaction(ctx context.Context, f func(ctx context.Context,
} }
func (d *model) Cache(ctx context.Context) *gdb.Model { func (d *model) Cache(ctx context.Context) *gdb.Model {
traceID := getTraceID(ctx) traceID := getTraceID(ctx, cacheKeyPrefix)
if traceID == "" { if traceID == "" {
glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty") glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty")
return nil return nil
@@ -463,11 +517,28 @@ func (d *model) Cache(ctx context.Context) *gdb.Model {
return d.Model return d.Model
} }
func (d *model) NoTenantId(ctx context.Context) *modelCache {
traceID := getTraceID(ctx, noTenantIdKeyPrefix)
if traceID == "" {
glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty")
return nil
}
if err := gcache.Set(ctx, traceID, true, time.Second); err != nil {
glog.Errorf(ctx, "[DB] Cache error: %v", err)
return nil
}
return &modelCache{
&model{
Model: d.Model,
},
}
}
// getTraceID 从 context 中获取链路追踪 ID // getTraceID 从 context 中获取链路追踪 ID
func getTraceID(ctx context.Context) string { func getTraceID(ctx context.Context, prefix string) string {
span := trace.SpanFromContext(ctx) span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() { if span != nil && span.SpanContext().HasTraceID() {
return span.SpanContext().TraceID().String() return fmt.Sprintf("%s%v", prefix, span.SpanContext().TraceID().String())
} }
return "" return ""
} }
@@ -494,3 +565,19 @@ func (r *RegionShardingRule) TableName(ctx context.Context, config gdb.ShardingT
// 这里不实现分表,返回空字符串 // 这里不实现分表,返回空字符串
return "", nil return "", nil
} }
func GetTablePrefix(ctx context.Context) (prefix string, err error) {
tenantId, config := checkSchemaConfig(ctx)
if config {
sprintf := fmt.Sprintf("database.%s%v.0.prefix", schemaPrefix, tenantId)
prefix = g.Cfg().MustGet(ctx, sprintf).String()
return
}
defaultConfig := g.Cfg().MustGet(ctx, "database.default")
if defaultConfig.IsSlice() {
prefix = g.Cfg().MustGet(ctx, "database.default.0.prefix").String()
} else {
prefix = g.Cfg().MustGet(ctx, "database.default.prefix").String()
}
return
}

View File

@@ -272,8 +272,8 @@ func init() {
// 从配置初始化多数据源 // 从配置初始化多数据源
if err := manager.InitializeFromConfig(ctx); err != nil { if err := manager.InitializeFromConfig(ctx); err != nil {
glog.Errorf(ctx, "❌ Failed to initialize MongoDB datasources: %v", err) glog.Errorf(ctx, "❌ Failed to initialize MongoDB datasources: %v", err)
} else { } else if names := manager.GetAllDataSourceNames(); len(names) > 0 {
glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", manager.GetAllDataSourceNames()) glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", names)
} }
// 启动健康检查 // 启动健康检查
@@ -291,7 +291,6 @@ func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error {
// 获取 mongo 配置下的所有子键 // 获取 mongo 配置下的所有子键
mongoConfig := g.Cfg().MustGet(ctx, "mongo") mongoConfig := g.Cfg().MustGet(ctx, "mongo")
if mongoConfig.IsNil() { if mongoConfig.IsNil() {
glog.Warningf(ctx, "no mongo configuration found in config.yml")
return nil return nil
} }

11
go.mod
View File

@@ -12,7 +12,10 @@ require (
github.com/cloudwego/eino-ext/components/document/parser/xlsx v0.0.0-20260323112355-f061db7e8419 github.com/cloudwego/eino-ext/components/document/parser/xlsx v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419 github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419 github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1
github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419 github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419
github.com/go-ego/gse v1.0.2
github.com/gogf/gf/contrib/registry/consul/v2 v2.9.5 github.com/gogf/gf/contrib/registry/consul/v2 v2.9.5
github.com/gogf/gf/contrib/trace/otlphttp/v2 v2.9.5 github.com/gogf/gf/contrib/trace/otlphttp/v2 v2.9.5
github.com/gogf/gf/v2 v2.9.5 github.com/gogf/gf/v2 v2.9.5
@@ -55,7 +58,7 @@ require (
github.com/clbanning/mxj/v2 v2.7.0 // indirect github.com/clbanning/mxj/v2 v2.7.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect github.com/cloudwego/base64x v0.1.6 // indirect
github.com/cloudwego/eino-ext/components/document/parser/html v0.0.0-20241224063832-9fbcc0e56c28 // indirect github.com/cloudwego/eino-ext/components/document/parser/html v0.0.0-20241224063832-9fbcc0e56c28 // indirect
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.2 // indirect github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14 // indirect
github.com/dgraph-io/badger/v4 v4.2.0 // indirect github.com/dgraph-io/badger/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-jump v0.0.0-20211018200510-ba001c3ffce0 // indirect github.com/dgryski/go-jump v0.0.0-20211018200510-ba001c3ffce0 // indirect
@@ -100,6 +103,7 @@ require (
github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/serf v0.10.1 // indirect github.com/hashicorp/serf v0.10.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/ratelimit v1.0.2 // indirect github.com/juju/ratelimit v1.0.2 // indirect
@@ -115,7 +119,7 @@ require (
github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/meguminnnnnnnnn/go-openai v0.1.0 // indirect github.com/meguminnnnnnnnn/go-openai v0.1.1 // indirect
github.com/microcosm-cc/bluemonday v1.0.27 // indirect github.com/microcosm-cc/bluemonday v1.0.27 // indirect
github.com/miekg/dns v1.1.63 // indirect github.com/miekg/dns v1.1.63 // indirect
github.com/minio/crc64nvme v1.1.0 // indirect github.com/minio/crc64nvme v1.1.0 // indirect
@@ -163,8 +167,11 @@ require (
github.com/tklauser/numcpus v0.2.2 // indirect github.com/tklauser/numcpus v0.2.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect
github.com/vcaesar/cedar v0.30.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/volcengine/volc-sdk-golang v1.0.23 // indirect
github.com/volcengine/volcengine-go-sdk v1.0.181 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/scram v1.1.2 // indirect

33
go.sum
View File

@@ -41,6 +41,7 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
@@ -67,8 +68,8 @@ github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgIS
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
github.com/bytedance/mockey v1.2.14 h1:KZaFgPdiUwW+jOWFieo3Lr7INM1P+6adO3hxZhDswY8= github.com/bytedance/mockey v1.3.0 h1:ONLRdvhqmCfr9rTasUB8ZKCfvbdD2tohOg4u+4Q/ed0=
github.com/bytedance/mockey v1.2.14/go.mod h1:1BPHF9sol5R1ud/+0VEHGQq/+i2lN+GTsr3O2Q9IENY= github.com/bytedance/mockey v1.3.0/go.mod h1:1BPHF9sol5R1ud/+0VEHGQq/+i2lN+GTsr3O2Q9IENY=
github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w= github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w=
github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc= github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc=
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
@@ -109,10 +110,14 @@ github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive
github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419/go.mod h1:9R0RQrQSpg1JaNnRtw7+RfRAAv0HgdE348YnrlZ6coo= github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419/go.mod h1:9R0RQrQSpg1JaNnRtw7+RfRAAv0HgdE348YnrlZ6coo=
github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419 h1:XsvQmwMKMD/w/YIPK0Y9pfBv0UH6rJ5ZiedUuZiA9Vo= github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419 h1:XsvQmwMKMD/w/YIPK0Y9pfBv0UH6rJ5ZiedUuZiA9Vo=
github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419/go.mod h1:Ov33JMUewdOoUgJbYNJt3qL7KQDVHYpoVBCjJXsz8sw= github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419/go.mod h1:Ov33JMUewdOoUgJbYNJt3qL7KQDVHYpoVBCjJXsz8sw=
github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1 h1:PM/+XAvJtrBqFlBY15ws0pb0+92XKHQv0ei3M7PIJcQ=
github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1/go.mod h1:6O6x0fHfM3uCLr3lX1DnB/my7fC3WRUA5hpkCkrkZrg=
github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419 h1:gGnohcgEaHqp5V826Ay0H3fi4TpK8ReWlUPePAnzvA4= github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419 h1:gGnohcgEaHqp5V826Ay0H3fi4TpK8ReWlUPePAnzvA4=
github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419/go.mod h1:ekJmA+GLD9vJyZNeODZDBFMiJ92Suy6nF0OY42X3sao= github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419/go.mod h1:ekJmA+GLD9vJyZNeODZDBFMiJ92Suy6nF0OY42X3sao=
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.2 h1:r9Id2wzJ05PoHl+Km7jQgNMgciaZI93TVnUYso89esM= github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419 h1:eM29lyMShtFZNoAhE5g96+zHg9PBLckRyd2HtVeeY4E=
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.2/go.mod h1:S4OkvglPY9hsm9tXeShODrf/WN1Cgu4bqu4nn/CnIic= github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419/go.mod h1:SajSFFRIXJXIbxadAAlSUIS5KTY8R/jzJg9RNSOXCCI=
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14 h1:yOZII6VYaL00CVZYba+HUixFygsW0Xz/1QjQ5htj1Ls=
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14/go.mod h1:1xMQZ8eE11pkEoTAEy8UlaAY817qGVMvjpDPGSIO3Ns=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
@@ -182,6 +187,8 @@ github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49P
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-ego/gse v1.0.2 h1:+27lYFPhQEhA9igtdOsJPRKYL/k3TwYsxBF5jr6KFv4=
github.com/go-ego/gse v1.0.2/go.mod h1:Fy35G+q7VV7Et1zIKO8o/sW1kkugV3znXap/lF/11zc=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
@@ -272,6 +279,7 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/goph/emperror v0.17.2 h1:yLapQcmEsO0ipe9p5TaN22djm3OFV/TfM/fcYP0/J18= github.com/goph/emperror v0.17.2 h1:yLapQcmEsO0ipe9p5TaN22djm3OFV/TfM/fcYP0/J18=
@@ -365,6 +373,10 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
@@ -439,8 +451,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/meguminnnnnnnnn/go-openai v0.1.0 h1:BGzB1PlS2Epq0mBB2TGLwzMihbR7BANrlMH3w4ZnY88= github.com/meguminnnnnnnnn/go-openai v0.1.1 h1:u/IMMgrj/d617Dh/8BKAwlcstD74ynOJzCtVl+y8xAs=
github.com/meguminnnnnnnnn/go-openai v0.1.0/go.mod h1:qs96ysDmxhE4BZoU45I43zcyfnaYxU3X+aRzLko/htY= github.com/meguminnnnnnnnn/go-openai v0.1.1/go.mod h1:qs96ysDmxhE4BZoU45I43zcyfnaYxU3X+aRzLko/htY=
github.com/meilisearch/meilisearch-go v0.36.1 h1:mJTCJE5g7tRvaqKco6DfqOuJEjX+rRltDEnkEC02Y0M= github.com/meilisearch/meilisearch-go v0.36.1 h1:mJTCJE5g7tRvaqKco6DfqOuJEjX+rRltDEnkEC02Y0M=
github.com/meilisearch/meilisearch-go v0.36.1/go.mod h1:hWcR0MuWLSzHfbz9GGzIr3s9rnXLm1jqkmHkJPbUSvM= github.com/meilisearch/meilisearch-go v0.36.1/go.mod h1:hWcR0MuWLSzHfbz9GGzIr3s9rnXLm1jqkmHkJPbUSvM=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4=
@@ -668,6 +680,7 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -698,10 +711,18 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/vcaesar/cedar v0.30.0 h1:9fSDpM7FTjjUdPiBUUa0MWYMRGSEcqgFXvppZcZ4d7Y=
github.com/vcaesar/cedar v0.30.0/go.mod h1:lyuGvALuZZDPNXwpzv/9LyxW+8Y6faN7zauFezNsnik=
github.com/vcaesar/tt v0.20.1 h1:D/jUeeVCNbq3ad8M7hhtB3J9x5RZ6I1n1eZ0BJp7M+4=
github.com/vcaesar/tt v0.20.1/go.mod h1:cH2+AwGAJm19Wa6xvEa+0r+sXDJBT0QgNQey6mwqLeU=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/volcengine/volc-sdk-golang v1.0.23 h1:anOslb2Qp6ywnsbyq9jqR0ljuO63kg9PY+4OehIk5R8=
github.com/volcengine/volc-sdk-golang v1.0.23/go.mod h1:AfG/PZRUkHJ9inETvbjNifTDgut25Wbkm2QoYBTbvyU=
github.com/volcengine/volcengine-go-sdk v1.0.181 h1:/3PB4M1N4fjMqiSKTJwX43EZ5Nn1HUOtQrSCk+22+wI=
github.com/volcengine/volcengine-go-sdk v1.0.181/go.mod h1:gfEDc1s7SYaGoY+WH2dRrS3qiuDJMkwqyfXWCa7+7oA=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=

View File

@@ -11,7 +11,6 @@ import (
_ "gitea.com/red-future/common/consul" _ "gitea.com/red-future/common/consul"
"gitea.com/red-future/common/jaeger" "gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/log/controller"
"gitea.com/red-future/common/utils" "gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient" "github.com/gogf/gf/v2/net/gclient"
@@ -70,14 +69,10 @@ func SkipMiddleware(h func(r *ghttp.Request), path string) (handler ghttp.Handle
} }
func RouteRegister(controllers []interface{}) { func RouteRegister(controllers []interface{}) {
Httpserver.Group("/log", func(group *ghttp.RouterGroup) { //Httpserver.Group("/log", func(group *ghttp.RouterGroup) {
group.Middleware(jaeger.NewTracer) // group.Middleware(jaeger.NewTracer)
group.Bind(controller.OperationLog) // group.Bind(controller.OperationLog)
}) //})
Httpserver.BindHandler("/uploadConfig", func(r *ghttp.Request) {
config := g.Config().MustGet(r.GetCtx(), "upload")
r.Response.WriteJsonExit(config)
})
re := regexp.MustCompile("[A-Z]") re := regexp.MustCompile("[A-Z]")
for _, t := range controllers { for _, t := range controllers {
sName := reflect.ValueOf(t).Elem().Type().Name() sName := reflect.ValueOf(t).Elem().Type().Name()

View File

@@ -1081,7 +1081,7 @@ func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, sta
} }
// 使用common/redis中的Lock方法获取分布式锁 // 使用common/redis中的Lock方法获取分布式锁
success, err := redis.Lock(ctx, lockKey, lockTimeout, func(ctx context.Context) error { success, err := utils.Lock(ctx, lockKey, lockTimeout, func(ctx context.Context) error {
// 设置熔断器状态 // 设置熔断器状态
_, err := redisClient.Do(ctx, "SETEX", stateKey, ttl, state) _, err := redisClient.Do(ctx, "SETEX", stateKey, ttl, state)
if err != nil { if err != nil {
@@ -1335,7 +1335,7 @@ func resetSingleResource(r *ghttp.Request, resourceName string) error {
if redisClient != nil { if redisClient != nil {
lockKey := "circuit_breaker:" + resourceName + ":lock" lockKey := "circuit_breaker:" + resourceName + ":lock"
// 使用较短的锁超时时间 // 使用较短的锁超时时间
success, err := redis.Lock(r.GetCtx(), lockKey, int64(3), func(ctx context.Context) error { success, err := utils.Lock(r.GetCtx(), lockKey, int64(3), func(ctx context.Context) error {
_, err := redisClient.Del(ctx, "circuit_breaker:"+resourceName+":state") _, err := redisClient.Del(ctx, "circuit_breaker:"+resourceName+":state")
if err != nil { if err != nil {
g.Log().Warningf(ctx, "清除分布式熔断状态失败: %s, error: %v", resourceName, err) g.Log().Warningf(ctx, "清除分布式熔断状态失败: %s, error: %v", resourceName, err)

View File

@@ -62,7 +62,7 @@ func IPLimiter(r *ghttp.Request) {
// UserLimiter 用户维度限流中间件(防止单用户滥用) // UserLimiter 用户维度限流中间件(防止单用户滥用)
func UserLimiter(r *ghttp.Request) { func UserLimiter(r *ghttp.Request) {
if r.RequestURI == "/swagger" { if r.RequestURI == "/swagger" || r.RequestURI == "/admin-go/api/v1/pub/captcha/get" || r.RequestURI == "/admin-go/api/v1/system/login" {
r.Middleware.Next() r.Middleware.Next()
return return
} }

8
rag/eino/consts.go Normal file
View File

@@ -0,0 +1,8 @@
package eino
const (
providerArk = "ark"
providerOpenai = "openai"
providerQianfan = "qianfan"
providerDashscope = "dashscope"
)

View File

@@ -5,59 +5,60 @@ import (
"github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive" "github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive"
"github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic" "github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic"
"github.com/cloudwego/eino/components/document"
"github.com/cloudwego/eino/schema" "github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
) )
// 全局只初始化一次
var (
splitter document.Transformer
)
// SemanticSplitDocument 语义分割文档 // SemanticSplitDocument 语义分割文档
func SemanticSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) { func SemanticSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) {
if g.IsEmpty(splitter) { // 默认分隔符(支持中英文)
// 默认分隔符(支持中英文) separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"}
separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"} // 读取配置,使用合理的默认值
// 读取配置,使用合理的默认值 bufferSize := g.Cfg().MustGet(ctx, "eino.splitter.bufferSize").Int()
bufferSize := g.Cfg().MustGet(ctx, "eino.splitter.bufferSize").Int() minChunkSize := g.Cfg().MustGet(ctx, "eino.splitter.minChunkSize").Int()
percentile := g.Cfg().MustGet(ctx, "eino.splitter.percentile").Float64() percentile := g.Cfg().MustGet(ctx, "eino.splitter.percentile").Float64()
batchSize := g.Cfg().MustGet(ctx, "eino.splitter.batchSize").Int() batchSize := g.Cfg().MustGet(ctx, "eino.splitter.batchSize").Int()
if batchSize <= 0 { if batchSize <= 0 {
batchSize = 10 // doubao-embedding-vision 限制每批最多 10 个 batchSize = 10 // doubao-embedding-vision 限制每批最多 10 个
} }
// 使用批量包装器 // 使用批量包装器
batchEmbedder := NewBatchEmbedder(Embedder, batchSize) var batchEmbedder *BatchEmbedder
provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
batchEmbedder = NewBatchEmbedder(EmbedderArk, batchSize)
case providerOpenai:
batchEmbedder = NewBatchEmbedder(EmbedderOpenAI, batchSize)
case providerDashscope:
batchEmbedder = NewBatchEmbedder(EmbedderDashscope, batchSize)
}
splitter, err = semantic.NewSplitter(ctx, &semantic.Config{ splitter, err := semantic.NewSplitter(ctx, &semantic.Config{
Embedding: batchEmbedder, Embedding: batchEmbedder,
BufferSize: bufferSize, BufferSize: bufferSize,
Percentile: percentile, MinChunkSize: minChunkSize,
Separators: separators, Percentile: percentile,
}) Separators: separators,
if err != nil { })
return if err != nil {
} return
} }
return splitter.Transform(ctx, docs) return splitter.Transform(ctx, docs)
} }
// RecursiveSplitDocument 递归分割文档 // RecursiveSplitDocument 递归分割文档
func RecursiveSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) { func RecursiveSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) {
if g.IsEmpty(splitter) { // 默认分隔符(支持中英文)
// 默认分隔符(支持中英文) separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"}
separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"} splitter, err := recursive.NewSplitter(ctx, &recursive.Config{
splitter, err = recursive.NewSplitter(ctx, &recursive.Config{ ChunkSize: 512,
ChunkSize: 1500, OverlapSize: 100,
OverlapSize: 300, KeepType: recursive.KeepTypeNone,
KeepType: recursive.KeepTypeNone, Separators: separators,
Separators: separators, })
}) if err != nil {
if err != nil { return
return
}
} }
return splitter.Transform(ctx, docs) return splitter.Transform(ctx, docs)
} }

View File

@@ -2,45 +2,68 @@ package eino
import ( import (
"context" "context"
"fmt"
"github.com/cloudwego/eino-ext/components/embedding/ark"
"github.com/cloudwego/eino-ext/components/embedding/dashscope" "github.com/cloudwego/eino-ext/components/embedding/dashscope"
"github.com/cloudwego/eino-ext/components/embedding/openai"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
"github.com/golang/glog" "github.com/golang/glog"
) )
// 全局只初始化一次 // 全局只初始化一次
var ( var (
Embedder *dashscope.Embedder // 导出供其他模块使用 EmbedderArk *ark.Embedder
EmbedderDashscope *dashscope.Embedder
EmbedderOpenAI *openai.Embedder
) )
// init程序启动时自动执行一次
func init() { func init() {
ctx := context.Background() ctx := context.Background()
if !g.Cfg().MustGet(ctx, "eino.embedding").IsEmpty() { if !g.Cfg().MustGet(ctx, "eino.embedding").IsEmpty() {
var err error var err error
cfg := &dashscope.EmbeddingConfig{ provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(), switch provider {
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(), case providerArk:
cfg := &ark.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
if apiType := g.Cfg().MustGet(ctx, "eino.embedding.apiType").String(); apiType != "" {
apiTypeVal := ark.APIType(apiType)
cfg.APIType = &apiTypeVal
}
EmbedderArk, err = ark.NewEmbedder(ctx, cfg)
case providerOpenai:
chatModelConfig := &openai.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
EmbedderOpenAI, err = openai.NewEmbedder(ctx, chatModelConfig)
case providerDashscope:
cfg := &dashscope.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
EmbedderDashscope, err = dashscope.NewEmbedder(ctx, cfg)
} }
// 检查是否配置了 APIType支持 "text_api" 和 "multi_modal_api"
//if apiType := g.Cfg().MustGet(ctx, "eino.embedding.apiType").String(); apiType != "" {
// apiTypeVal := dashscope.APIType(apiType)
// cfg.APIType = &apiTypeVal
//}
Embedder, err = dashscope.NewEmbedder(ctx, cfg)
if err != nil { if err != nil {
glog.Fatalf("NewEmbedder of ark error: %v", err) glog.Fatalf("NewEmbedder of %v error: %v", provider, err)
} }
//embedding, err := embedder.EmbedStrings(ctx, []string{"hello world", "bye bye"})
//if err != nil {
// log.Printf("embedding error: %v\n", err)
// return
//}
//
//log.Printf("embedding: %v\n", embedding)
} }
return
} }
func EmbedStrings(ctx context.Context, texts []string) (embeddings [][]float64, err error) { func EmbedStrings(ctx context.Context, texts []string) (embeddings [][]float64, err error) {
return Embedder.EmbedStrings(ctx, texts) provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
return EmbedderArk.EmbedStrings(ctx, texts)
case providerOpenai:
return EmbedderOpenAI.EmbedStrings(ctx, texts)
case providerDashscope:
return EmbedderDashscope.EmbedStrings(ctx, texts)
}
return nil, fmt.Errorf("unsupported provider: %v", provider)
} }

114
rag/gse/utils.go Normal file
View File

@@ -0,0 +1,114 @@
package gse
import (
"context"
"sort"
"github.com/go-ego/gse"
"github.com/go-ego/gse/hmm/extracker"
"github.com/go-ego/gse/hmm/segment"
"github.com/gogf/gf/v2/os/glog"
)
var GseTool *gseTool
// 初始化函数:程序启动时执行一次
func init() {
var err error
GseTool, err = newGseTool()
if err != nil {
glog.Error(context.Background(), err)
}
}
// gseTool 关键词提取工具gse v1.0.2 标准)
type gseTool struct {
seg gse.Segmenter
tfidf *extracker.TagExtracter
tr *extracker.TextRanker
}
// newGseTool 初始化工具(内置词典 + 停用词)
func newGseTool() (tool *gseTool, err error) {
// 1. 初始化分词器
var seg gse.Segmenter
// 内置词典(无外部文件)
err = seg.LoadDictEmbed()
if err != nil {
return
}
// 内置停用词v1.0.2 标准)
err = seg.LoadStopEmbed()
if err != nil {
return
}
// 2. 初始化 TF-IDF 提取器
tfidf := &extracker.TagExtracter{}
tfidf.WithGse(seg)
err = tfidf.LoadIdf()
if err != nil {
return
}
// 3. 初始化 TextRank 提取器
tr := &extracker.TextRanker{}
tr.WithGse(seg)
tool = &gseTool{
seg: seg,
tfidf: tfidf,
tr: tr,
}
return
}
// Cut 分词(关键词提取唯一正确模式:精确模式 + HMM
func (k *gseTool) Cut(text string) []string {
return k.seg.Cut(text, true)
}
// Keyword 最终输出:关键词 + 权重
type Keyword struct {
Word string `json:"word"`
Score float64 `json:"score"`
}
func (k *gseTool) Extract(text string, topN int) []Keyword {
// 1. 提取 TF-IDF
tfTags := k.extractTFIDF(text, topN)
// 2. 提取 TextRank
trTags := k.extractTextRank(text, topN)
// 3. 合并成最终关键词(业务最常用)
scoreMap := make(map[string]float64)
for _, tag := range tfTags {
scoreMap[tag.Text] = tag.Weight
}
for _, tag := range trTags {
scoreMap[tag.Text] = tag.Weight
}
// 转成切片并排序(高分在前)
res := make([]Keyword, 0, len(scoreMap))
for word, score := range scoreMap {
res = append(res, Keyword{Word: word, Score: score})
}
sort.Slice(res, func(i, j int) bool {
return res[i].Score > res[j].Score
})
return res
}
// ExtractTFIDF TF-IDF 关键词带权重90% 业务:文章标签、搜索、关键词
func (k *gseTool) extractTFIDF(text string, topN int) segment.Segments {
return k.tfidf.ExtractTags(text, topN)
}
// ExtractTextRank TextRank 关键词(带权重)长文本、摘要、语义理解
func (k *gseTool) extractTextRank(text string, topN int) segment.Segments {
return k.tr.TextRank(text, topN)
}

69
rpc/client.go Normal file
View File

@@ -0,0 +1,69 @@
package rpc
import (
"context"
"sync"
"github.com/gogf/gf/v2/frame/g"
consulClient "github.com/rpcxio/rpcx-consul/client"
"github.com/smallnest/rpcx/client"
)
var (
xclientMap = make(map[string]client.XClient)
xclientMu sync.RWMutex
)
// CallWithConsul 基于Consul服务发现调用RPC
func CallWithConsul(ctx context.Context, serviceName string, args, reply interface{}) error {
xclient, err := getOrCreatXClient(serviceName)
if err != nil {
return err
}
err = xclient.Call(ctx, "Mul", args, reply)
if err != nil {
// 调用失败,清理失效客户端
removeXClient(serviceName)
}
return err
}
func removeXClient(serviceName string) {
xclientMu.Lock()
defer xclientMu.Unlock()
if c, ok := xclientMap[serviceName]; ok {
c.Close()
delete(xclientMap, serviceName)
}
}
func getOrCreatXClient(serviceName string) (client.XClient, error) {
// 第一次:读锁,快速判断是否存在
xclientMu.RLock()
if c, ok := xclientMap[serviceName]; ok {
xclientMu.RUnlock()
return c, nil
}
xclientMu.RUnlock()
// 没找到,加写锁准备创建
xclientMu.Lock()
defer xclientMu.Unlock()
// 第二次:双重检查,防止刚被别人创建完
if c, ok := xclientMap[serviceName]; ok {
return c, nil
}
consulAddr := g.Cfg().MustGet(nil, "consul.address").String()
d, err := consulClient.NewConsulDiscovery("rpcx", serviceName, []string{consulAddr}, nil)
if err != nil {
return nil, err
}
c := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption)
xclientMap[serviceName] = c
return c, nil
}

View File

@@ -1,364 +1,347 @@
package rpc package rpc
import ( //var (
"context" // // pluginsContainer rpcx插件容器全局统一设置
"encoding/json" // // init()中添加链路追踪插件所有client共用此容器
"errors" // pluginsContainer = rpcxClient.NewPluginContainer()
"strings" //
"sync" // // clientPool 连接池缓存key为服务名value为客户端实例
"time" // clientPool = make(map[string]*rpcxClient.OneClient)
//
"gitea.com/red-future/common/consul" // // poolMutex 连接池锁
"gitea.com/red-future/common/jaeger" // poolMutex sync.RWMutex
"github.com/gogf/gf/v2/frame/g" //
rpcxClient "github.com/smallnest/rpcx/client" // // healthCheckInterval 健康检查间隔(秒)
"go.opentelemetry.io/otel/attribute" // healthCheckInterval = 30
"go.opentelemetry.io/otel/codes" //
"go.opentelemetry.io/otel/trace" // // lastHealthCheckTime 上次健康检查时间key为服务名
) // lastHealthCheckTime = make(map[string]time.Time)
//
var ( // // serviceAddrCache 服务地址缓存key为服务名value为地址
// pluginsContainer rpcx插件容器全局统一设置 // serviceAddrCache = make(map[string]string)
// init()中添加链路追踪插件所有client共用此容器 //)
pluginsContainer = rpcxClient.NewPluginContainer() //
//func init() {
// clientPool 连接池缓存key为服务名value为客户端实例 // // 全局设置链路追踪插件所有client共用
clientPool = make(map[string]*rpcxClient.OneClient) // pluginsContainer.Add(&TracingPlugin{})
//
// poolMutex 连接池锁 // // 启动后台健康检查协程
poolMutex sync.RWMutex // go healthCheckLoop()
//}
// healthCheckInterval 健康检查间隔(秒) //
healthCheckInterval = 30 //// healthCheckLoop 后台健康检查循环
//func healthCheckLoop() {
// lastHealthCheckTime 上次健康检查时间key为服务名 // ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
lastHealthCheckTime = make(map[string]time.Time) // defer ticker.Stop()
//
// serviceAddrCache 服务地址缓存key为服务名value为地址 // for range ticker.C {
serviceAddrCache = make(map[string]string) // checkAllConnections()
) // }
//}
func init() { //
// 全局设置链路追踪插件所有client共用 //// checkAllConnections 检查所有缓存连接的健康状态
pluginsContainer.Add(&TracingPlugin{}) //func checkAllConnections() {
// poolMutex.Lock()
// 启动后台健康检查协程 // defer poolMutex.Unlock()
go healthCheckLoop() //
} // now := time.Now()
// for serviceName, client := range clientPool {
// healthCheckLoop 后台健康检查循环 // // 检查连接是否需要健康检查
func healthCheckLoop() { // if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second) // if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
defer ticker.Stop() // continue
// }
for range ticker.C { // }
checkAllConnections() //
} // ctx := context.Background()
} //
// // 检查连接健康状态(心跳检测)
// checkAllConnections 检查所有缓存连接的健康状态 // if !isClientHealthy(ctx, client, serviceName) {
func checkAllConnections() { // g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
poolMutex.Lock() // client.Close()
defer poolMutex.Unlock() // delete(clientPool, serviceName)
// delete(lastHealthCheckTime, serviceName)
now := time.Now() // delete(serviceAddrCache, serviceName)
for serviceName, client := range clientPool { // continue
// 检查连接是否需要健康检查 // }
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok { //
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second { // // 连接健康,检查服务地址是否发生变化
continue // currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
} // if err != nil {
} // g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v保持现有连接", serviceName, err)
// lastHealthCheckTime[serviceName] = now
ctx := context.Background() // continue
// }
// 检查连接健康状态(心跳检测) //
if !isClientHealthy(ctx, client, serviceName) { // // 检查地址是否发生变化
g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName) // if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
client.Close() // g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s重建连接", serviceName, oldAddr, currentAddr)
delete(clientPool, serviceName) // // 关闭旧连接并从连接池移除,下次请求时会创建新连接
delete(lastHealthCheckTime, serviceName) // client.Close()
delete(serviceAddrCache, serviceName) // delete(clientPool, serviceName)
continue // delete(lastHealthCheckTime, serviceName)
} // // 更新缓存的新地址
// serviceAddrCache[serviceName] = currentAddr
// 连接健康,检查服务地址是否发生变化 // } else {
currentAddr, err := consul.GetInstanceAddr(ctx, serviceName) // // 地址未变化,更新检查时间
if err != nil { // if !ok {
g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v保持现有连接", serviceName, err) // serviceAddrCache[serviceName] = currentAddr
lastHealthCheckTime[serviceName] = now // }
continue // g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
} // }
//
// 检查地址是否发生变化 // lastHealthCheckTime[serviceName] = now
if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr { // }
g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s重建连接", serviceName, oldAddr, currentAddr) //}
// 关闭旧连接并从连接池移除,下次请求时会创建新连接 //
client.Close() //// isClientHealthy 检查client是否健康
delete(clientPool, serviceName) //// 使用心跳检测方式:尝试调用服务的心跳方法
delete(lastHealthCheckTime, serviceName) //func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
// 更新缓存的新地址 // if client == nil {
serviceAddrCache[serviceName] = currentAddr // return false
} else { // }
// 地址未变化,更新检查时间 //
if !ok { // // 设置较短的超时时间,避免阻塞
serviceAddrCache[serviceName] = currentAddr // pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
} // defer cancel()
g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName) //
} // // 尝试调用健康检查方法
// // 大多数服务都会提供 Ping 或 Health 方法
lastHealthCheckTime[serviceName] = now // // 如果服务没有提供这些方法,会返回错误,我们认为是健康的
} // // 因为连接本身是正常的,只是方法不存在
} // var reply interface{}
// err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
// isClientHealthy 检查client是否健康 //
// 使用心跳检测方式:尝试调用服务的心跳方法 // // 如果调用成功,连接肯定健康
func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool { // if err == nil {
if client == nil { // return true
return false // }
} //
// // 如果是方法不存在的错误说明连接是健康的只是服务没有Ping方法
// 设置较短的超时时间,避免阻塞 // // 这种情况下我们认为是健康的
pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second) // if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
defer cancel() // return true
// }
// 尝试调用健康检查方法 //
// 大多数服务都会提供 Ping 或 Health 方法 // // 其他错误(网络错误、超时等)说明连接不健康
// 如果服务没有提供这些方法,会返回错误,我们认为是健康的 // g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
// 因为连接本身是正常的,只是方法不存在 // return false
var reply interface{} //}
err := client.Call(pingCtx, serviceName, "Ping", nil, &reply) //
//// isMethodNotFoundError 判断是否是方法未找到错误
// 如果调用成功,连接肯定健康 //func isMethodNotFoundError(err error) bool {
if err == nil { // if err == nil {
return true // return false
} // }
// errStr := err.Error()
// 如果是方法不存在的错误说明连接是健康的只是服务没有Ping方法 // // rpcx 方法不存在的常见错误信息
// 这种情况下我们认为是健康的 // return strings.Contains(errStr, "not found") ||
if isMethodNotFoundError(err) || isServiceNotFoundError(err) { // strings.Contains(errStr, "no such") ||
return true // strings.Contains(errStr, "service not found") ||
} // strings.Contains(errStr, "method not found")
//}
// 其他错误(网络错误、超时等)说明连接不健康 //
g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err) //// isServiceNotFoundError 判断是否是服务未找到错误
return false //func isServiceNotFoundError(err error) bool {
} // if err == nil {
// return false
// isMethodNotFoundError 判断是否是方法未找到错误 // }
func isMethodNotFoundError(err error) bool { // errStr := err.Error()
if err == nil { // return strings.Contains(errStr, "no service") ||
return false // strings.Contains(errStr, "service not registered")
} //}
errStr := err.Error() //
// rpcx 方法不存在的常见错误信息 //// getOrCreateClient 从连接池获取或创建客户端(带连接池)
return strings.Contains(errStr, "not found") || //func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
strings.Contains(errStr, "no such") || // if g.IsEmpty(serviceName) {
strings.Contains(errStr, "service not found") || // return nil, errors.New("服务名称不能为空")
strings.Contains(errStr, "method not found") // }
} //
// // 先尝试从连接池获取
// isServiceNotFoundError 判断是否是服务未找到错误 // poolMutex.RLock()
func isServiceNotFoundError(err error) bool { // client, exists := clientPool[serviceName]
if err == nil { // poolMutex.RUnlock()
return false //
} // // 如果存在且健康,直接返回
errStr := err.Error() // if exists && isClientHealthy(ctx, client, serviceName) {
return strings.Contains(errStr, "no service") || // g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
strings.Contains(errStr, "service not registered") // return client, nil
} // }
//
// getOrCreateClient 从连接池获取或创建客户端(带连接池) // // 不存在或不健康,重新创建
func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { // poolMutex.Lock()
if g.IsEmpty(serviceName) { // defer poolMutex.Unlock()
return nil, errors.New("服务名称不能为空") //
} // // 双重检查,防止并发时重复创建
// if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
// 先尝试从连接池获取 // return client, nil
poolMutex.RLock() // }
client, exists := clientPool[serviceName] //
poolMutex.RUnlock() // // 获取服务实例地址
// addr, err := consul.GetInstanceAddr(ctx, serviceName)
// 如果存在且健康,直接返回 // if err != nil {
if exists && isClientHealthy(ctx, client, serviceName) { // g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName) // return nil, err
return client, nil // }
} //
// g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
// 不存在或不健康,重新创建 //
poolMutex.Lock() // // 缓存服务地址,用于健康检查时对比
defer poolMutex.Unlock() // serviceAddrCache[serviceName] = addr
//
// 双重检查,防止并发时重复创建 // // 创建服务发现
if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) { // discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
return client, nil // if err != nil {
} // g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
// return nil, err
// 获取服务实例地址 // }
addr, err := consul.GetInstanceAddr(ctx, serviceName) //
if err != nil { // // 创建新客户端
g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err) // newClient := rpcxClient.NewOneClient(
return nil, err // rpcxClient.Failtry,
} // rpcxClient.RandomSelect,
// discovery,
g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) // rpcxClient.DefaultOption,
// )
// 缓存服务地址,用于健康检查时对比 // newClient.SetPlugins(pluginsContainer)
serviceAddrCache[serviceName] = addr //
// // 更新连接池
// 创建服务发现 // if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") // oldClient.Close()
if err != nil { // }
g.Log().Errorf(ctx, "创建服务发现失败: %v", err) // clientPool[serviceName] = newClient
return nil, err // lastHealthCheckTime[serviceName] = time.Now()
} //
// g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
// 创建新客户端 //
newClient := rpcxClient.NewOneClient( // return newClient, nil
rpcxClient.Failtry, //}
rpcxClient.RandomSelect, //
discovery, //// Call 调用rpcx服务方法
rpcxClient.DefaultOption, //// serviceName: 服务名称
) //// serviceMethod: 服务方法
newClient.SetPlugins(pluginsContainer) //// args: 请求参数
//// reply: 响应结果
// 更新连接池 //func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil { // // 从连接池获取客户端(不再关闭连接)
oldClient.Close() // client, err := getOrCreateClient(ctx, serviceName)
} // if err != nil {
clientPool[serviceName] = newClient // g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
lastHealthCheckTime[serviceName] = time.Now() // return err
// }
g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName) //
// // 设置超时
return newClient, nil // callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
} // defer cancel()
//
// Call 调用rpcx服务方法 // // 调用服务方法
// serviceName: 服务名称 // err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
// serviceMethod: 服务方法 // if err != nil {
// args: 请求参数 // g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
// reply: 响应结果 //
func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error { // // 如果调用失败,检查连接是否需要重新创建
// 从连接池获取客户端(不再关闭连接) // poolMutex.Lock()
client, err := getOrCreateClient(ctx, serviceName) // if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
if err != nil { // // 标记为不健康,下次请求时会重新创建
g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err) // delete(lastHealthCheckTime, serviceName)
return err // }
} // poolMutex.Unlock()
//
// 设置超时 // return err
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second) // }
defer cancel() //
// return nil
// 调用服务方法 //}
err = client.Call(callCtx, serviceName, serviceMethod, args, reply) //
if err != nil { //// Close 关闭指定服务的连接(用于清理连接池)
g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err) //func Close(serviceName string) {
// poolMutex.Lock()
// 如果调用失败,检查连接是否需要重新创建 // defer poolMutex.Unlock()
poolMutex.Lock() //
if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client { // if client, ok := clientPool[serviceName]; ok {
// 标记为不健康,下次请求时会重新创建 // client.Close()
delete(lastHealthCheckTime, serviceName) // delete(clientPool, serviceName)
} // delete(lastHealthCheckTime, serviceName)
poolMutex.Unlock() // delete(serviceAddrCache, serviceName)
// g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
return err // }
} //}
//
return nil //// CloseAll 关闭所有连接(用于优雅停机)
} //func CloseAll() {
// poolMutex.Lock()
// Close 关闭指定服务的连接(用于清理连接池) // defer poolMutex.Unlock()
func Close(serviceName string) { //
poolMutex.Lock() // for serviceName, client := range clientPool {
defer poolMutex.Unlock() // client.Close()
// g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
if client, ok := clientPool[serviceName]; ok { // }
client.Close() // clientPool = make(map[string]*rpcxClient.OneClient)
delete(clientPool, serviceName) // lastHealthCheckTime = make(map[string]time.Time)
delete(lastHealthCheckTime, serviceName) // serviceAddrCache = make(map[string]string)
delete(serviceAddrCache, serviceName) //}
g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName) //
} //// TracingPlugin rpcx链路追踪插件
} //// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
//type TracingPlugin struct{}
// CloseAll 关闭所有连接(用于优雅停机) //
func CloseAll() { //// PreCall 调用前拦截 - 创建jaeger span
poolMutex.Lock() //func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
defer poolMutex.Unlock() // // 创建span名称格式: ServiceName.Method
// spanName := serviceName + "." + serviceMethod
for serviceName, client := range clientPool { // ctx, span := jaeger.NewSpan(ctx, spanName)
client.Close() //
g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName) // // 记录服务和方法信息
} // span.SetAttributes(
clientPool = make(map[string]*rpcxClient.OneClient) // attribute.String("rpc.service", serviceName),
lastHealthCheckTime = make(map[string]time.Time) // attribute.String("rpc.method", serviceMethod),
serviceAddrCache = make(map[string]string) // attribute.String("rpc.system", "rpcx"),
} // )
// var data []byte
// TracingPlugin rpcx链路追踪插件 // // 记录请求参数序列化为JSON
// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口 // if args != nil {
type TracingPlugin struct{} // if data, err = json.Marshal(args); err == nil {
// argsStr := string(data)
// PreCall 调用前拦截 - 创建jaeger span // // 限制长度,避免过大
func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) { // if len(argsStr) > 2000 {
// 创建span名称格式: ServiceName.Method // argsStr = argsStr[:2000] + "... (truncated)"
spanName := serviceName + "." + serviceMethod // }
ctx, span := jaeger.NewSpan(ctx, spanName) // span.SetAttributes(attribute.String("rpc.request", argsStr))
// }
// 记录服务和方法信息 // }
span.SetAttributes( //
attribute.String("rpc.service", serviceName), // g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
attribute.String("rpc.method", serviceMethod), //
attribute.String("rpc.system", "rpcx"), // return
) //}
var data []byte //
// 记录请求参数序列化为JSON //// PostCall 调用后拦截 - 记录结果和错误
if args != nil { //func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
if data, err = json.Marshal(args); err == nil { // span := trace.SpanFromContext(ctx)
argsStr := string(data) // if span != nil && span.IsRecording() {
// 限制长度,避免过大 // defer span.End()
if len(argsStr) > 2000 { //
argsStr = argsStr[:2000] + "... (truncated)" // // 记录响应结果
} // if reply != nil {
span.SetAttributes(attribute.String("rpc.request", argsStr)) // if data, err := json.Marshal(reply); err == nil {
} // replyStr := string(data)
} // // 限制长度,避免过大
// if len(replyStr) > 2000 {
g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod) // replyStr = replyStr[:2000] + "... (truncated)"
// }
return // span.SetAttributes(attribute.String("rpc.response", replyStr))
} // }
// }
// PostCall 调用后拦截 - 记录结果和错误 //
func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error { // // 处理错误
span := trace.SpanFromContext(ctx) // if err != nil {
if span != nil && span.IsRecording() { // jaeger.RecordError(ctx, err, "rpcx调用失败")
defer span.End() // span.SetStatus(codes.Error, err.Error())
// g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
// 记录响应结果 // } else {
if reply != nil { // g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
if data, err := json.Marshal(reply); err == nil { // }
replyStr := string(data) // }
// 限制长度,避免过大 //
if len(replyStr) > 2000 { // return nil
replyStr = replyStr[:2000] + "... (truncated)" //}
}
span.SetAttributes(attribute.String("rpc.response", replyStr))
}
}
// 处理错误
if err != nil {
jaeger.RecordError(ctx, err, "rpcx调用失败")
span.SetStatus(codes.Error, err.Error())
g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
} else {
g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
}
}
return nil
}

93
rpc/server.go Normal file
View File

@@ -0,0 +1,93 @@
package rpc
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/rpcxio/rpcx-consul/serverplugin"
"github.com/smallnest/rpcx/server"
)
var rpcServer *server.Server
// Serve 启动RPC服务
func Serve(ctx context.Context, service interface{}) error {
// 获取本机IP
ip, err := getLocalIP()
if err != nil {
return fmt.Errorf("获取IP失败: %w", err)
}
// 解析端口
addrConfig := g.Cfg().MustGet(ctx, "server.address").String()
portStr := strings.TrimPrefix(addrConfig, ":")
port, err := strconv.Atoi(portStr)
if err != nil {
return errors.New("端口解析失败")
}
serviceAddr := fmt.Sprintf("%s:%d", ip, port)
// 创建服务端
rpcServer = server.NewServer()
// 添加 Consul 注册插件
consulAddr := g.Cfg().MustGet(ctx, "consul.address").String()
if consulAddr != "" {
plugin := &serverplugin.ConsulRegisterPlugin{
ServiceAddress: "tcp@" + serviceAddr,
ConsulServers: []string{consulAddr},
BasePath: "rpcx",
UpdateInterval: time.Minute,
}
if err := plugin.Start(); err != nil {
return err
}
rpcServer.Plugins.Add(plugin)
g.Log().Infof(ctx, "Consul注册成功: %s", serviceAddr)
}
// 注册服务
err = rpcServer.Register(service, "")
if err != nil {
g.Log().Errorf(ctx, "注册服务失败: %v", err)
return nil
}
// 优雅关闭
//gproc.AddShutdownFunc(func(ctx context.Context) {
// if rpcServer != nil {
// _ = rpcServer.Shutdown(ctx)
// }
//})
// 异步启动
go func() {
g.Log().Infof(ctx, "RPC服务启动: %s", serviceAddr)
if err := rpcServer.Serve("tcp", serviceAddr); err != nil {
g.Log().Fatalf(ctx, "RPC服务启动失败: %v", err)
}
}()
return nil
}
func getLocalIP() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String(), nil
}
}
}
return "", fmt.Errorf("未找到本机IP")
}