6 Commits

Author SHA1 Message Date
qhd
62a09c0e49 feat: 去掉跨域,在网关统一做跨域处理 2026-04-02 16:37:32 +08:00
qhd
0c2e36f607 feat: gfdb增加noTenantId 2026-04-02 10:37:31 +08:00
b1c62b3f16 mongo初始化控制台打印优化 2026-04-02 09:12:59 +08:00
81d8b70f0d Dockerfile 2026-04-01 16:18:04 +08:00
41b2a37fc0 Dockerfile 2026-04-01 14:19:50 +08:00
0ad6bc9438 Dockerfile 2026-04-01 14:11:07 +08:00
5 changed files with 149 additions and 65 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
@@ -20,6 +21,8 @@ var (
consulAddr string
reconnectMutex sync.RWMutex
reconnectDone chan struct{}
connected bool
httpClient *http.Client
)
// connectConsul 连接 Consul
@@ -27,6 +30,11 @@ func connectConsul(ctx context.Context) error {
reconnectMutex.Lock()
defer reconnectMutex.Unlock()
// 如果已经连接,不再重复连接
if connected && registry != nil {
return nil
}
var err error
registry, err = consul.New(consul.WithAddress(consulAddr))
if err != nil {
@@ -36,11 +44,9 @@ func connectConsul(ctx context.Context) error {
gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
connected = true
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
// 启动健康检查和自动重连
go startHealthCheckAndReconnect()
return nil
}
@@ -51,11 +57,16 @@ func startHealthCheckAndReconnect() {
}
reconnectDone = make(chan struct{})
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
ctx := context.Background()
// 初始化HTTP客户端用于健康检查
httpClient = &http.Client{
Timeout: 5 * time.Second,
}
for {
select {
case <-ticker.C:
@@ -65,8 +76,15 @@ func startHealthCheckAndReconnect() {
}
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
// 重置连接状态并重连
reconnectMutex.Lock()
connected = false
registry = nil
reconnectMutex.Unlock()
if err := connectConsul(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,5秒后重试...", err)
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,30秒后重试...", err)
}
case <-reconnectDone:
@@ -81,18 +99,26 @@ func checkConsulHealth(ctx context.Context) bool {
reconnectMutex.RLock()
defer reconnectMutex.RUnlock()
if registry == nil {
if registry == nil || !connected {
return false
}
// 尝试获取服务列表来检测连接是否正常
services, err := registry.Search(ctx, gsvc.SearchInput{})
// 使用consul原生API进行健康检查
// 调用 /v1/agent/self 接口检测连接状态
url := fmt.Sprintf("http://%s/v1/agent/self", consulAddr)
resp, err := httpClient.Get(url)
if err != nil {
g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err)
return false
}
defer resp.Body.Close()
g.Log().Debugf(ctx, "✅ Consul 健康检查通过,发现 %d 个服务", len(services))
if resp.StatusCode != http.StatusOK {
g.Log().Debugf(ctx, "Consul 健康检查失败,状态码: %d", resp.StatusCode)
return false
}
//g.Log().Debugf(ctx, "✅ Consul 健康检查通过")
return true
}
@@ -102,8 +128,12 @@ func init() {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
return
}
if err := connectConsul(context.Background()); err != nil {
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
} else {
// 连接成功后启动健康检查和自动重连
go startHealthCheckAndReconnect()
}
}
func getLocalIP() (string, error) {

View File

@@ -9,6 +9,7 @@ import (
"strings"
"time"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/utils"
"github.com/bwmarrin/snowflake"
"github.com/gogf/gf/v2/crypto/gmd5"
@@ -280,11 +281,48 @@ func deleteHook(ctx context.Context, in *gdb.HookDeleteInput) (result sql.Result
// ==================== Select钩子缓存读取 ====================
func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
traceID := getTraceID(ctx)
var tenantId uint64
// ===================== 最终版安全追加租户ID =====================
tenantEnabled, err := gcache.Get(ctx, getTraceID(ctx, noTenantIdKeyPrefix))
if err != nil {
return
}
if !gconv.Bool(tenantEnabled) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
tenantId = user.TenantId
// 【关键修复】找到 SQL 中第一个出现的 ORDER BY / GROUP BY / LIMIT 等关键字位置
sql := in.Sql
insertPos := len(sql)
keywords := []string{" ORDER BY ", " GROUP BY ", " HAVING ", " LIMIT ", " FOR UPDATE"}
for _, kw := range keywords {
if idx := gstr.PosI(sql, kw); idx != -1 {
insertPos = idx
break
}
}
enabled, err := gcache.Get(ctx, traceID)
// 【正确拼接】把条件插入到关键字之前,而不是直接拼在最后
condition := " " + beans.DefSQLBaseCol.TenantId + " = ?"
if gstr.Contains(gstr.ToUpper(sql), " WHERE ") {
// 有 WHERE → 加 AND
in.Sql = sql[:insertPos] + " AND" + condition + sql[insertPos:]
} else {
// 无 WHERE → 加 WHERE
in.Sql = sql[:insertPos] + " WHERE" + condition + sql[insertPos:]
}
in.Args = append(in.Args, tenantId)
}
// ==================================================================
cacheEnabled, err := gcache.Get(ctx, getTraceID(ctx, cacheKeyPrefix))
if err != nil {
return
}
// 未启用缓存,直接执行查询
if !gconv.Bool(enabled) {
if !gconv.Bool(cacheEnabled) {
return in.Next(ctx)
}
@@ -303,18 +341,13 @@ func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result
}
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
encrypt, err := gmd5.Encrypt(fmt.Sprintf("%s:%s", whereCondition, in.Args))
if err != nil {
return nil, err
}
// 构建缓存keysql:tenantId:table:where条件:args
cacheKey := fmt.Sprintf("%s:%s:%s", getCacheKey(user.TenantId, in.Table, false), getSelectTypeString(in.SelectType), encrypt)
cacheKey := fmt.Sprintf("%s:%s:%s", getCacheKey(tenantId, in.Table, false), getSelectTypeString(in.SelectType), encrypt)
// 1. 先查缓存
if data, ok := getFromCache(ctx, cacheKey); ok {
@@ -343,7 +376,12 @@ func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result
}
func getCacheKey(tenantId uint64, table string, isBlur bool) string {
cacheKey := fmt.Sprintf("sql:tenantId-%v:%s", tenantId, table)
var cacheKey string
if g.IsEmpty(tenantId) {
cacheKey = fmt.Sprintf("sql:%s", table)
} else {
cacheKey = fmt.Sprintf("sql:tenantId-%v:%s", tenantId, table)
}
if isBlur {
cacheKey = fmt.Sprintf("%s:*", cacheKey)
}
@@ -369,7 +407,9 @@ func getSelectTypeString(selectType gdb.SelectType) string {
// ==================== 调用方法 ====================
var (
schemaPrefix = "tenant-"
schemaPrefix = "tenant-"
cacheKeyPrefix = "cache-"
noTenantIdKeyPrefix = "tenantId-"
)
type Gfdb interface {
@@ -382,49 +422,39 @@ type cache interface {
Cache(ctx context.Context) *gdb.Model
}
type model struct {
*gdb.Model
type noTenantId interface {
NoTenantId(ctx context.Context) *modelCache
}
type dataBase struct {
gdb.DB
}
func GetTablePrefix(ctx context.Context) (prefix string, err error) {
tenantId, config, err := checkSchemaConfig(ctx)
if err != nil {
glog.Errorf(ctx, "[DB] checkSchemaConfig error: %v", err)
return
}
if config {
sprintf := fmt.Sprintf("database.%s%v.0.prefix", schemaPrefix, tenantId)
prefix = g.Cfg().MustGet(ctx, sprintf).String()
return
}
prefix = g.Cfg().MustGet(ctx, "database.default.0.prefix").String()
return
type model struct {
*gdb.Model
}
func checkSchemaConfig(ctx context.Context) (uint64, bool, error) {
type modelCache struct {
*model
}
func checkSchemaConfig(ctx context.Context) (uint64, bool) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err)
return 0, false, err
return 0, false
}
var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId)
sprintf := fmt.Sprintf("database.%s", schema)
if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() {
return user.TenantId, true, nil
return user.TenantId, true
}
return user.TenantId, false, nil
return user.TenantId, false
}
func DB(ctx context.Context) Gfdb {
tenantId, config, err := checkSchemaConfig(ctx)
if err != nil {
glog.Errorf(ctx, "[DB] checkSchemaConfig error: %v", err)
return nil
}
tenantId, config := checkSchemaConfig(ctx)
var schema = fmt.Sprintf("%s%v", schemaPrefix, tenantId)
var dbName []string
@@ -450,11 +480,8 @@ func (d *dataBase) Model(ctx context.Context, tableNameOrStruct ...any) *model {
m := d.DB.Model(tableNameOrStruct...).Ctx(ctx)
tenantId, config, err := checkSchemaConfig(ctx)
if err != nil {
glog.Errorf(ctx, "[DB] checkSchemaConfig error: %v", err)
return nil
}
tenantId, config := checkSchemaConfig(ctx)
if config {
// 创建按地区分库的配置
shardingConfig := gdb.ShardingConfig{
@@ -478,7 +505,7 @@ func (d *dataBase) Transaction(ctx context.Context, f func(ctx context.Context,
}
func (d *model) Cache(ctx context.Context) *gdb.Model {
traceID := getTraceID(ctx)
traceID := getTraceID(ctx, cacheKeyPrefix)
if traceID == "" {
glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty")
return nil
@@ -490,11 +517,28 @@ func (d *model) Cache(ctx context.Context) *gdb.Model {
return d.Model
}
func (d *model) NoTenantId(ctx context.Context) *modelCache {
traceID := getTraceID(ctx, noTenantIdKeyPrefix)
if traceID == "" {
glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty")
return nil
}
if err := gcache.Set(ctx, traceID, true, time.Second); err != nil {
glog.Errorf(ctx, "[DB] Cache error: %v", err)
return nil
}
return &modelCache{
&model{
Model: d.Model,
},
}
}
// getTraceID 从 context 中获取链路追踪 ID
func getTraceID(ctx context.Context) string {
func getTraceID(ctx context.Context, prefix string) string {
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
return span.SpanContext().TraceID().String()
return fmt.Sprintf("%s%v", prefix, span.SpanContext().TraceID().String())
}
return ""
}
@@ -521,3 +565,19 @@ func (r *RegionShardingRule) TableName(ctx context.Context, config gdb.ShardingT
// 这里不实现分表,返回空字符串
return "", nil
}
func GetTablePrefix(ctx context.Context) (prefix string, err error) {
tenantId, config := checkSchemaConfig(ctx)
if config {
sprintf := fmt.Sprintf("database.%s%v.0.prefix", schemaPrefix, tenantId)
prefix = g.Cfg().MustGet(ctx, sprintf).String()
return
}
defaultConfig := g.Cfg().MustGet(ctx, "database.default")
if defaultConfig.IsSlice() {
prefix = g.Cfg().MustGet(ctx, "database.default.0.prefix").String()
} else {
prefix = g.Cfg().MustGet(ctx, "database.default.prefix").String()
}
return
}

View File

@@ -272,8 +272,8 @@ func init() {
// 从配置初始化多数据源
if err := manager.InitializeFromConfig(ctx); err != nil {
glog.Errorf(ctx, "❌ Failed to initialize MongoDB datasources: %v", err)
} else {
glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", manager.GetAllDataSourceNames())
} else if names := manager.GetAllDataSourceNames(); len(names) > 0 {
glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", names)
}
// 启动健康检查
@@ -291,7 +291,6 @@ func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error {
// 获取 mongo 配置下的所有子键
mongoConfig := g.Cfg().MustGet(ctx, "mongo")
if mongoConfig.IsNil() {
glog.Warningf(ctx, "no mongo configuration found in config.yml")
return nil
}

View File

@@ -11,7 +11,6 @@ import (
_ "gitea.com/red-future/common/consul"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/log/controller"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
@@ -70,14 +69,10 @@ func SkipMiddleware(h func(r *ghttp.Request), path string) (handler ghttp.Handle
}
func RouteRegister(controllers []interface{}) {
Httpserver.Group("/log", func(group *ghttp.RouterGroup) {
group.Middleware(jaeger.NewTracer)
group.Bind(controller.OperationLog)
})
Httpserver.BindHandler("/uploadConfig", func(r *ghttp.Request) {
config := g.Config().MustGet(r.GetCtx(), "upload")
r.Response.WriteJsonExit(config)
})
//Httpserver.Group("/log", func(group *ghttp.RouterGroup) {
// group.Middleware(jaeger.NewTracer)
// group.Bind(controller.OperationLog)
//})
re := regexp.MustCompile("[A-Z]")
for _, t := range controllers {
sName := reflect.ValueOf(t).Elem().Type().Name()

View File

@@ -62,7 +62,7 @@ func IPLimiter(r *ghttp.Request) {
// UserLimiter 用户维度限流中间件(防止单用户滥用)
func UserLimiter(r *ghttp.Request) {
if r.RequestURI == "/swagger" {
if r.RequestURI == "/swagger" || r.RequestURI == "/admin-go/api/v1/pub/captcha/get" || r.RequestURI == "/admin-go/api/v1/system/login" {
r.Middleware.Next()
return
}