Compare commits
10 Commits
d17c242a38
...
d1f80c3109
| Author | SHA1 | Date | |
|---|---|---|---|
| d1f80c3109 | |||
| 7714326db7 | |||
| 9c967eed4c | |||
| 9d8b4d1617 | |||
| ac0aed9cff | |||
| 1cb6786dcb | |||
| b9acc7bc18 | |||
| 27355b47fe | |||
| a8f7c925ff | |||
| ded8a42c64 |
@@ -3,6 +3,8 @@ package beans
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -38,17 +40,44 @@ type MongoBaseDO struct {
|
|||||||
|
|
||||||
// SQLBaseDO SQL数据库基础实体
|
// SQLBaseDO SQL数据库基础实体
|
||||||
type SQLBaseDO struct {
|
type SQLBaseDO struct {
|
||||||
Id uint64 `json:"id"` // 主键ID
|
Id int64 `orm:"id" json:"id"` // 主键ID
|
||||||
Creator string `json:"creator"` // 创建人
|
Creator string `orm:"creator" json:"creator"` // 创建人
|
||||||
CreatedAt *time.Time `json:"createdAt"` // 创建时间
|
CreatedAt *gtime.Time `orm:"created_at" json:"createdAt"` // 创建时间
|
||||||
Updater string `json:"updater"` // 更新人
|
Updater string `orm:"updater" json:"updater"` // 更新人
|
||||||
UpdatedAt *time.Time `json:"updatedAt"` // 更新时间
|
UpdatedAt *gtime.Time `orm:"updated_at" json:"updatedAt"` // 更新时间
|
||||||
TenantId string `json:"tenantId"` // 租户ID
|
DeletedAt *gtime.Time `orm:"deleted_at" json:"deletedAt"` // 软删除时间
|
||||||
IsDeleted bool `json:"isDeleted"` // 是否删除
|
IsDeleted bool `orm:"is_deleted" json:"isDeleted"` // 是否删除
|
||||||
|
}
|
||||||
|
|
||||||
|
type SQLBaseCol struct {
|
||||||
|
Id string
|
||||||
|
Creator string
|
||||||
|
CreatedAt string
|
||||||
|
Updater string
|
||||||
|
UpdatedAt string
|
||||||
|
DeletedAt string
|
||||||
|
IsDeleted string
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefSQLBaseCol = SQLBaseCol{
|
||||||
|
Id: "id",
|
||||||
|
Creator: "creator",
|
||||||
|
CreatedAt: "created_at",
|
||||||
|
Updater: "updater",
|
||||||
|
UpdatedAt: "updated_at",
|
||||||
|
DeletedAt: "deleted_at",
|
||||||
|
IsDeleted: "is_deleted",
|
||||||
}
|
}
|
||||||
|
|
||||||
type User struct {
|
type User struct {
|
||||||
UserId interface{} `bson:"userId" json:"userId"` // 用户ID
|
Id uint64 `orm:"id,primary" json:"id"` //
|
||||||
UserName interface{} `bson:"userName" json:"userName"` // 用户名
|
UserName string `orm:"user_name,unique" json:"userName"` // 用户名
|
||||||
TenantId interface{} `bson:"tenantId" json:"tenantId"` // 租户ID
|
UserNickname string `orm:"user_nickname" json:"userNickname"` // 用户昵称
|
||||||
|
UserPassword string `orm:"user_password" json:"userPassword"` // 登录密码;cmf_password加密
|
||||||
|
UserSalt string `orm:"user_salt" json:"userSalt"` // 加密盐
|
||||||
|
UserStatus uint `orm:"user_status" json:"userStatus"` // 用户状态;0:禁用,1:正常,2:未验证
|
||||||
|
IsAdmin int `orm:"is_admin" json:"isAdmin"` // 是否后台管理员 1 是 0 否
|
||||||
|
Avatar string `orm:"avatar" json:"avatar"` //头像
|
||||||
|
DeptId uint64 `orm:"dept_id" json:"deptId"` //部门id
|
||||||
|
TenantId uint64 `orm:"tenant_id" json:"tenantId"` //租户id
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,10 +6,10 @@ import (
|
|||||||
|
|
||||||
// ModuleAssetId 模块资产ID映射(key-value结构)
|
// ModuleAssetId 模块资产ID映射(key-value结构)
|
||||||
// Key: 服务名,Value: 资产ID
|
// Key: 服务名,Value: 资产ID
|
||||||
var ModuleAssetId = map[string]string{
|
var ModuleAssetId = map[string]int64{
|
||||||
"assets": "696b4acd1be1c8b76c4b4c15", // 资产模块
|
"assets": 1, // 资产模块
|
||||||
"cid": "696f423705e496ba4ccbe665", // 广告模块
|
"cid": 2, // 广告模块
|
||||||
"customerService": "696f421205e496ba4ccbe662", // AI客服模块
|
"customerService": 3, // AI客服模块
|
||||||
}
|
}
|
||||||
|
|
||||||
// 模块类型(值从ModuleAssetId map获取)
|
// 模块类型(值从ModuleAssetId map获取)
|
||||||
@@ -48,7 +48,7 @@ var TenantModuleTypesAd []TenantModuleTypeKV
|
|||||||
var TenantModuleTypesAICs []TenantModuleTypeKV
|
var TenantModuleTypesAICs []TenantModuleTypeKV
|
||||||
|
|
||||||
// GetTenantModuleTypes 获取模块的租户类型列表
|
// GetTenantModuleTypes 获取模块的租户类型列表
|
||||||
func GetTenantModuleTypes(module string) []TenantModuleTypeKV {
|
func GetTenantModuleTypes(module int64) []TenantModuleTypeKV {
|
||||||
switch module {
|
switch module {
|
||||||
case TenantModuleAssets:
|
case TenantModuleAssets:
|
||||||
return TenantModuleTypesAssets
|
return TenantModuleTypesAssets
|
||||||
|
|||||||
477
db/gfdb/gfdb.go
Normal file
477
db/gfdb/gfdb.go
Normal file
@@ -0,0 +1,477 @@
|
|||||||
|
package gfdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.com/red-future/common/utils"
|
||||||
|
"github.com/bwmarrin/snowflake"
|
||||||
|
"github.com/gogf/gf/v2/crypto/gmd5"
|
||||||
|
"github.com/gogf/gf/v2/database/gdb"
|
||||||
|
"github.com/gogf/gf/v2/database/gredis"
|
||||||
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
|
"github.com/gogf/gf/v2/os/gcache"
|
||||||
|
"github.com/gogf/gf/v2/os/glog"
|
||||||
|
"github.com/gogf/gf/v2/text/gstr"
|
||||||
|
"github.com/gogf/gf/v2/util/gconv"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ==================== 缓存管理器(单例) ====================
|
||||||
|
|
||||||
|
var (
|
||||||
|
localCache *gcache.Cache
|
||||||
|
)
|
||||||
|
|
||||||
|
// getLocalCache 获取本地缓存实例
|
||||||
|
func getLocalCache() *gcache.Cache {
|
||||||
|
if localCache == nil {
|
||||||
|
localCache = gcache.New()
|
||||||
|
}
|
||||||
|
return localCache
|
||||||
|
}
|
||||||
|
|
||||||
|
// getFromCache 从缓存获取数据(本地缓存 -> Redis)
|
||||||
|
func getFromCache(ctx context.Context, key string) ([]byte, bool) {
|
||||||
|
|
||||||
|
// 1. 先查本地缓存
|
||||||
|
if val, err := getLocalCache().Get(ctx, key); err == nil && val != nil {
|
||||||
|
if data := val.Bytes(); len(data) > 0 {
|
||||||
|
return data, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 再查Redis缓存
|
||||||
|
if g.Redis() != nil {
|
||||||
|
result, err := g.Redis().Get(ctx, key)
|
||||||
|
if err == nil && !result.IsEmpty() {
|
||||||
|
data := result.Bytes()
|
||||||
|
// 写入本地缓存
|
||||||
|
err = getLocalCache().Set(ctx, key, data, time.Duration(g.Cfg().MustGet(ctx, "cache.localTTL").Int64())*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return data, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// setToCache 写入缓存(本地缓存 + Redis)
|
||||||
|
func setToCache(ctx context.Context, key string, data []byte) (err error) {
|
||||||
|
if len(data) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. 写入本地缓存
|
||||||
|
if err = getLocalCache().Set(ctx, key, data, time.Duration(g.Cfg().MustGet(ctx, "cache.localTTL").Int64())*time.Second); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 写入Redis缓存
|
||||||
|
if g.Redis() != nil {
|
||||||
|
_, err = g.Redis().Set(ctx, key, data, gredis.SetOption{
|
||||||
|
TTLOption: gredis.TTLOption{
|
||||||
|
EX: gconv.PtrInt64(g.Cfg().MustGet(ctx, "cache.redisTTL")),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteCacheByPattern 根据模式删除缓存
|
||||||
|
func deleteCacheByPattern(ctx context.Context, pattern string) (err error) {
|
||||||
|
// 1. 删除匹配模式的本地缓存
|
||||||
|
localCache := getLocalCache()
|
||||||
|
keys := localCache.MustKeyStrings(ctx)
|
||||||
|
if len(keys) > 0 {
|
||||||
|
for _, key := range keys {
|
||||||
|
if matchPattern(key, pattern) {
|
||||||
|
_, err = localCache.Remove(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 删除Redis缓存(使用SCAN+DEL)
|
||||||
|
if g.Redis() != nil {
|
||||||
|
|
||||||
|
keys, err := g.Redis().Keys(ctx, pattern)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, key := range keys {
|
||||||
|
_, err = g.Redis().Del(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchPattern 检查 key 是否匹配 Redis SCAN 的 MATCH 模式(支持 * 通配符)
|
||||||
|
func matchPattern(key string, pattern string) bool {
|
||||||
|
// 将 Redis 的 MATCH 模式转换为正则表达式
|
||||||
|
// 转义正则特殊字符(除了 *)
|
||||||
|
regexPattern := regexp.QuoteMeta(pattern)
|
||||||
|
// 将转义后的 \* 替换回 .*
|
||||||
|
regexPattern = strings.ReplaceAll(regexPattern, `\*`, ".*")
|
||||||
|
// 添加开始和结束锚点
|
||||||
|
regexPattern = "^" + regexPattern + "$"
|
||||||
|
|
||||||
|
matched, _ := regexp.MatchString(regexPattern, key)
|
||||||
|
return matched
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 统一Hook入口 ====================
|
||||||
|
|
||||||
|
// CatchSQLHook 返回统一的 HookHandler(包含租户自动赋值和缓存)
|
||||||
|
// 使用示例:
|
||||||
|
//
|
||||||
|
// // 基础使用(自动租户赋值,无缓存)
|
||||||
|
// g.DB().Model("user").Hook(base.CatchSQLHook()).Ctx(ctx).Insert(data)
|
||||||
|
//
|
||||||
|
// // 启用缓存(用户无感知,自动处理缓存key)
|
||||||
|
// ctx = base.WithCacheEnabled(ctx, "asset")
|
||||||
|
// Asset.CtxWithCache(ctx).Where("id", 123).Scan(&result)
|
||||||
|
func catchSQLHook() gdb.HookHandler {
|
||||||
|
return gdb.HookHandler{
|
||||||
|
Insert: insertHook,
|
||||||
|
Update: updateHook,
|
||||||
|
Delete: deleteHook,
|
||||||
|
Select: selectHook,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== Insert钩子 ====================
|
||||||
|
|
||||||
|
func insertHook(ctx context.Context, in *gdb.HookInsertInput) (result sql.Result, err error) {
|
||||||
|
|
||||||
|
userInfo, err := utils.GetUserInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
node, err := snowflake.NewNode(g.Cfg().MustGet(ctx, "server.workerId").Int64())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for i := range in.Data {
|
||||||
|
if _, ok := in.Data[i]["id"]; ok {
|
||||||
|
in.Data[i]["id"] = node.Generate().Int64()
|
||||||
|
}
|
||||||
|
if !g.IsEmpty(userInfo.UserName) {
|
||||||
|
if _, ok := in.Data[i]["creator"]; ok {
|
||||||
|
in.Data[i]["creator"] = userInfo.UserName
|
||||||
|
}
|
||||||
|
if _, ok := in.Data[i]["updater"]; ok {
|
||||||
|
in.Data[i]["updater"] = userInfo.UserName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 执行插入
|
||||||
|
result, err = in.Next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 清除相关缓存
|
||||||
|
if userInfo != nil && userInfo.TenantId != 0 {
|
||||||
|
if err = deleteCacheByPattern(ctx, getCacheKey(userInfo.TenantId, in.Table, true)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== Update钩子 ====================
|
||||||
|
|
||||||
|
func updateHook(ctx context.Context, in *gdb.HookUpdateInput) (result sql.Result, err error) {
|
||||||
|
// 1. 自动赋值修改人
|
||||||
|
userInfo, _ := utils.GetUserInfo(ctx)
|
||||||
|
|
||||||
|
switch data := in.Data.(type) {
|
||||||
|
case gdb.Map:
|
||||||
|
if !g.IsEmpty(userInfo.UserName) {
|
||||||
|
if _, ok := data["updater"]; ok {
|
||||||
|
data["updater"] = userInfo.UserName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case gdb.List:
|
||||||
|
for i := range data {
|
||||||
|
if !g.IsEmpty(userInfo.UserName) {
|
||||||
|
if _, ok := data[i]["updater"]; ok {
|
||||||
|
data[i]["updater"] = userInfo.UserName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 执行更新
|
||||||
|
result, err = in.Next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 清除相关缓存
|
||||||
|
if userInfo != nil && userInfo.TenantId != 0 {
|
||||||
|
if err = deleteCacheByPattern(ctx, getCacheKey(userInfo.TenantId, in.Table, true)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== Delete钩子 ====================
|
||||||
|
|
||||||
|
func deleteHook(ctx context.Context, in *gdb.HookDeleteInput) (result sql.Result, err error) {
|
||||||
|
// 1. 执行删除
|
||||||
|
result, err = in.Next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 清除相关缓存
|
||||||
|
userInfo, _ := utils.GetUserInfo(ctx)
|
||||||
|
if userInfo != nil && userInfo.TenantId != 0 {
|
||||||
|
if err = deleteCacheByPattern(ctx, getCacheKey(userInfo.TenantId, in.Table, true)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== Select钩子(缓存读取) ====================
|
||||||
|
|
||||||
|
func selectHook(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
|
||||||
|
traceID := getTraceID(ctx)
|
||||||
|
|
||||||
|
enabled, err := gcache.Get(ctx, traceID)
|
||||||
|
// 未启用缓存,直接执行查询
|
||||||
|
if !gconv.Bool(enabled) {
|
||||||
|
return in.Next(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 从 SQL 字符串中提取 WHERE 条件部分
|
||||||
|
whereCondition := ""
|
||||||
|
// 查找 WHERE 关键字(不区分大小写)
|
||||||
|
whereIndex := gstr.PosI(in.Sql, " WHERE ")
|
||||||
|
if whereIndex != -1 {
|
||||||
|
// 提取 WHERE 之后的内容
|
||||||
|
whereCondition = in.Sql[whereIndex+7:]
|
||||||
|
// 移除 ORDER BY, GROUP BY, HAVING, LIMIT 等后续子句
|
||||||
|
for _, keyword := range []string{" ORDER BY ", " GROUP BY ", " HAVING ", " LIMIT ", " FOR UPDATE"} {
|
||||||
|
if idx := gstr.PosI(whereCondition, keyword); idx != -1 {
|
||||||
|
whereCondition = whereCondition[:idx]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
user, err := utils.GetUserInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
encrypt, err := gmd5.Encrypt(fmt.Sprintf("%s:%s", whereCondition, in.Args))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建缓存key:sql:tenantId:table:where条件:args
|
||||||
|
cacheKey := fmt.Sprintf("%s:%s:%s", getCacheKey(user.TenantId, in.Table, false), getSelectTypeString(in.SelectType), encrypt)
|
||||||
|
|
||||||
|
// 1. 先查缓存
|
||||||
|
if data, ok := getFromCache(ctx, cacheKey); ok {
|
||||||
|
var records gdb.Result
|
||||||
|
if err := json.Unmarshal(data, &records); err == nil && len(records) > 0 {
|
||||||
|
return records, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 执行数据库查询
|
||||||
|
result, err = in.Next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 写入缓存
|
||||||
|
if len(result) > 0 {
|
||||||
|
if data, err := json.Marshal(result); err == nil {
|
||||||
|
if err = setToCache(ctx, cacheKey, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCacheKey(tenantId uint64, table string, isBlur bool) string {
|
||||||
|
cacheKey := fmt.Sprintf("sql:tenantId-%v:%s", tenantId, table)
|
||||||
|
if isBlur {
|
||||||
|
cacheKey = fmt.Sprintf("%s:*", cacheKey)
|
||||||
|
}
|
||||||
|
return cacheKey
|
||||||
|
}
|
||||||
|
|
||||||
|
// getSelectTypeString 将 SelectType 枚举转换为可读字符串
|
||||||
|
func getSelectTypeString(selectType gdb.SelectType) string {
|
||||||
|
switch selectType {
|
||||||
|
case gdb.SelectTypeDefault:
|
||||||
|
return "default"
|
||||||
|
case gdb.SelectTypeCount:
|
||||||
|
return "count"
|
||||||
|
case gdb.SelectTypeValue:
|
||||||
|
return "value"
|
||||||
|
case gdb.SelectTypeArray:
|
||||||
|
return "array"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 调用方法 ====================
|
||||||
|
|
||||||
|
var (
|
||||||
|
schemaPrefix = "tenant-"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Gfdb interface {
|
||||||
|
Model(ctx context.Context, tableNameOrStruct ...any) *model
|
||||||
|
Transaction(ctx context.Context, f func(ctx context.Context, tx gdb.TX) error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type cache interface {
|
||||||
|
Cache(ctx context.Context) *gdb.Model
|
||||||
|
}
|
||||||
|
|
||||||
|
type model struct {
|
||||||
|
*gdb.Model
|
||||||
|
}
|
||||||
|
|
||||||
|
type dataBase struct {
|
||||||
|
gdb.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func DB(ctx context.Context) Gfdb {
|
||||||
|
var dbName []string
|
||||||
|
user, err := utils.GetUserInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId)
|
||||||
|
sprintf := fmt.Sprintf("database.%s", schema)
|
||||||
|
if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() {
|
||||||
|
dbName = append(dbName, schema)
|
||||||
|
} else {
|
||||||
|
dbName = append(dbName, "default")
|
||||||
|
// 配置文件中 default 是数组格式,需要通过索引 0 访问
|
||||||
|
defaultConfig := g.Cfg().MustGet(ctx, "database.default")
|
||||||
|
if defaultConfig.IsSlice() {
|
||||||
|
schema = g.Cfg().MustGet(ctx, "database.default.0.name").String()
|
||||||
|
} else {
|
||||||
|
schema = g.Cfg().MustGet(ctx, "database.default.name").String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &dataBase{
|
||||||
|
DB: g.DB(dbName...).Schema(schema),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dataBase) Model(ctx context.Context, tableNameOrStruct ...any) *model {
|
||||||
|
user, err := utils.GetUserInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf(ctx, "[DB] GetUserInfo error: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
m := d.DB.Model(tableNameOrStruct...).Ctx(ctx)
|
||||||
|
|
||||||
|
var schema = fmt.Sprintf("%s%v", schemaPrefix, user.TenantId)
|
||||||
|
sprintf := fmt.Sprintf("database.%s", schema)
|
||||||
|
if !g.Cfg().MustGet(ctx, sprintf).IsEmpty() {
|
||||||
|
// 创建按地区分库的配置
|
||||||
|
shardingConfig := gdb.ShardingConfig{
|
||||||
|
Schema: gdb.ShardingSchemaConfig{
|
||||||
|
Enable: true, // 启用分库
|
||||||
|
Prefix: schemaPrefix, // 分库前缀
|
||||||
|
Rule: &RegionShardingRule{RegionMapping: user.TenantId}, // 自定义分库规则
|
||||||
|
},
|
||||||
|
}
|
||||||
|
m.Sharding(shardingConfig).ShardingValue(user.TenantId)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.OmitNilData().OmitNilWhere().Hook(catchSQLHook())
|
||||||
|
return &model{
|
||||||
|
Model: m,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dataBase) Transaction(ctx context.Context, f func(ctx context.Context, tx gdb.TX) error) error {
|
||||||
|
return d.DB.Transaction(ctx, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *model) Cache(ctx context.Context) *gdb.Model {
|
||||||
|
traceID := getTraceID(ctx)
|
||||||
|
if traceID == "" {
|
||||||
|
glog.Errorf(ctx, "[DB] GetTraceID error: traceID is empty")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := gcache.Set(ctx, traceID, true, time.Second); err != nil {
|
||||||
|
glog.Errorf(ctx, "[DB] Cache error: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return d.Model
|
||||||
|
}
|
||||||
|
|
||||||
|
// getTraceID 从 context 中获取链路追踪 ID
|
||||||
|
func getTraceID(ctx context.Context) string {
|
||||||
|
span := trace.SpanFromContext(ctx)
|
||||||
|
if span != nil && span.SpanContext().HasTraceID() {
|
||||||
|
return span.SpanContext().TraceID().String()
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type RegionShardingRule struct {
|
||||||
|
RegionMapping uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RegionShardingRule) SchemaName(ctx context.Context, config gdb.ShardingSchemaConfig, value any) (string, error) {
|
||||||
|
region, ok := value.(uint64)
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("sharding value must be string for RegionShardingRule")
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.RegionMapping == region {
|
||||||
|
return config.Prefix + gconv.String(region), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "default", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TableName 实现分表规则接口
|
||||||
|
func (r *RegionShardingRule) TableName(ctx context.Context, config gdb.ShardingTableConfig, value any) (string, error) {
|
||||||
|
// 这里不实现分表,返回空字符串
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
3
go.mod
3
go.mod
@@ -1,9 +1,10 @@
|
|||||||
module gitea.com/red-future/common
|
module gitea.com/red-future/common
|
||||||
|
|
||||||
go 1.25.3
|
go 1.25.5
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alibaba/sentinel-golang v1.0.4
|
github.com/alibaba/sentinel-golang v1.0.4
|
||||||
|
github.com/bwmarrin/snowflake v0.3.0
|
||||||
github.com/cloudwego/eino v0.7.26
|
github.com/cloudwego/eino v0.7.26
|
||||||
github.com/gogf/gf/contrib/registry/consul/v2 v2.9.5
|
github.com/gogf/gf/contrib/registry/consul/v2 v2.9.5
|
||||||
github.com/gogf/gf/contrib/trace/otlphttp/v2 v2.9.5
|
github.com/gogf/gf/contrib/trace/otlphttp/v2 v2.9.5
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -56,6 +56,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU
|
|||||||
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
||||||
github.com/bugsnag/bugsnag-go v1.4.0/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
|
github.com/bugsnag/bugsnag-go v1.4.0/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
|
||||||
github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
|
github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
|
||||||
|
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
|
||||||
|
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
|
||||||
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
|
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
|
||||||
github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
|
github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
|
||||||
github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w=
|
github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w=
|
||||||
|
|||||||
87
rpc/rpcx.go
87
rpc/rpcx.go
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -61,7 +62,7 @@ func checkAllConnections() {
|
|||||||
defer poolMutex.Unlock()
|
defer poolMutex.Unlock()
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for serviceName := range clientPool {
|
for serviceName, client := range clientPool {
|
||||||
// 检查连接是否需要健康检查
|
// 检查连接是否需要健康检查
|
||||||
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
|
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
|
||||||
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
|
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
|
||||||
@@ -69,8 +70,19 @@ func checkAllConnections() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 从consul重新获取服务地址,检查是否发生变化
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// 检查连接健康状态(心跳检测)
|
||||||
|
if !isClientHealthy(ctx, client, serviceName) {
|
||||||
|
g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
|
||||||
|
client.Close()
|
||||||
|
delete(clientPool, serviceName)
|
||||||
|
delete(lastHealthCheckTime, serviceName)
|
||||||
|
delete(serviceAddrCache, serviceName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 连接健康,检查服务地址是否发生变化
|
||||||
currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
|
currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err)
|
g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err)
|
||||||
@@ -82,10 +94,9 @@ func checkAllConnections() {
|
|||||||
if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
|
if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
|
||||||
g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr)
|
g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr)
|
||||||
// 关闭旧连接并从连接池移除,下次请求时会创建新连接
|
// 关闭旧连接并从连接池移除,下次请求时会创建新连接
|
||||||
if client, exists := clientPool[serviceName]; exists {
|
|
||||||
client.Close()
|
client.Close()
|
||||||
delete(clientPool, serviceName)
|
delete(clientPool, serviceName)
|
||||||
}
|
delete(lastHealthCheckTime, serviceName)
|
||||||
// 更新缓存的新地址
|
// 更新缓存的新地址
|
||||||
serviceAddrCache[serviceName] = currentAddr
|
serviceAddrCache[serviceName] = currentAddr
|
||||||
} else {
|
} else {
|
||||||
@@ -93,20 +104,68 @@ func checkAllConnections() {
|
|||||||
if !ok {
|
if !ok {
|
||||||
serviceAddrCache[serviceName] = currentAddr
|
serviceAddrCache[serviceName] = currentAddr
|
||||||
}
|
}
|
||||||
g.Log().Debugf(ctx, "服务[%s]地址未变化,保持现有连接", serviceName)
|
g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
|
||||||
}
|
}
|
||||||
|
|
||||||
lastHealthCheckTime[serviceName] = now
|
lastHealthCheckTime[serviceName] = now
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// isClientHealthy 检查client是否健康(简化版)
|
// isClientHealthy 检查client是否健康
|
||||||
// 实际健康检查依赖调用失败时触发重建
|
// 使用心跳检测方式:尝试调用服务的心跳方法
|
||||||
func isClientHealthy(client *rpcxClient.OneClient) bool {
|
func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
|
||||||
// rpcx有内置的重连机制,我们信任client对象的有效性
|
if client == nil {
|
||||||
// 只要client不为nil就认为是健康的
|
return false
|
||||||
// 实际的错误会在调用时暴露,触发重新创建
|
}
|
||||||
return client != nil
|
|
||||||
|
// 设置较短的超时时间,避免阻塞
|
||||||
|
pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// 尝试调用健康检查方法
|
||||||
|
// 大多数服务都会提供 Ping 或 Health 方法
|
||||||
|
// 如果服务没有提供这些方法,会返回错误,我们认为是健康的
|
||||||
|
// 因为连接本身是正常的,只是方法不存在
|
||||||
|
var reply interface{}
|
||||||
|
err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
|
||||||
|
|
||||||
|
// 如果调用成功,连接肯定健康
|
||||||
|
if err == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果是方法不存在的错误,说明连接是健康的,只是服务没有Ping方法
|
||||||
|
// 这种情况下我们认为是健康的
|
||||||
|
if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 其他错误(网络错误、超时等)说明连接不健康
|
||||||
|
g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// isMethodNotFoundError 判断是否是方法未找到错误
|
||||||
|
func isMethodNotFoundError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
errStr := err.Error()
|
||||||
|
// rpcx 方法不存在的常见错误信息
|
||||||
|
return strings.Contains(errStr, "not found") ||
|
||||||
|
strings.Contains(errStr, "no such") ||
|
||||||
|
strings.Contains(errStr, "service not found") ||
|
||||||
|
strings.Contains(errStr, "method not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// isServiceNotFoundError 判断是否是服务未找到错误
|
||||||
|
func isServiceNotFoundError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
errStr := err.Error()
|
||||||
|
return strings.Contains(errStr, "no service") ||
|
||||||
|
strings.Contains(errStr, "service not registered")
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOrCreateClient 从连接池获取或创建客户端(带连接池)
|
// getOrCreateClient 从连接池获取或创建客户端(带连接池)
|
||||||
@@ -121,7 +180,7 @@ func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.One
|
|||||||
poolMutex.RUnlock()
|
poolMutex.RUnlock()
|
||||||
|
|
||||||
// 如果存在且健康,直接返回
|
// 如果存在且健康,直接返回
|
||||||
if exists && isClientHealthy(client) {
|
if exists && isClientHealthy(ctx, client, serviceName) {
|
||||||
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
|
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
@@ -131,7 +190,7 @@ func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.One
|
|||||||
defer poolMutex.Unlock()
|
defer poolMutex.Unlock()
|
||||||
|
|
||||||
// 双重检查,防止并发时重复创建
|
// 双重检查,防止并发时重复创建
|
||||||
if client, exists := clientPool[serviceName]; exists && isClientHealthy(client) {
|
if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
|
||||||
"net"
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
@@ -18,6 +17,7 @@ import (
|
|||||||
"github.com/gogf/gf/v2/errors/gcode"
|
"github.com/gogf/gf/v2/errors/gcode"
|
||||||
"github.com/gogf/gf/v2/errors/gerror"
|
"github.com/gogf/gf/v2/errors/gerror"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"github.com/gogf/gf/v2/util/gconv"
|
"github.com/gogf/gf/v2/util/gconv"
|
||||||
"github.com/tiger1103/gfast-token/gftoken"
|
"github.com/tiger1103/gfast-token/gftoken"
|
||||||
)
|
)
|
||||||
@@ -66,19 +66,29 @@ func GetMonthToday(t time.Time, month int) time.Time {
|
|||||||
return target.AddDate(0, 0, t.Day()-1)
|
return target.AddDate(0, 0, t.Day()-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetUserInfo(ctx context.Context) (user beans.User, err error) {
|
func GetUserInfo(ctx context.Context) (user *beans.User, err error) {
|
||||||
// 检查context是否已取消
|
// 1. 优先从 context 中获取
|
||||||
select {
|
if !g.IsNil(ctx.Value("user")) {
|
||||||
case <-ctx.Done():
|
err = gconv.Struct(ctx.Value("user"), &user)
|
||||||
return user, ctx.Err()
|
if err != nil {
|
||||||
default:
|
return user, gerror.Wrap(err, "用户信息转换失败")
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !g.IsNil(ctx.Value("id")) || !g.IsNil(ctx.Value("userName")) || !g.IsNil(ctx.Value("tenantId")) {
|
// 2. 从请求头中获取(gateway 转发时设置)
|
||||||
user.UserId = ctx.Value("id")
|
if req := g.RequestFromCtx(ctx); req != nil {
|
||||||
user.UserName = ctx.Value("userName")
|
userInfoHeader := req.Header.Get("X-User-Info")
|
||||||
user.TenantId = ctx.Value("tenantId")
|
if userInfoHeader != "" {
|
||||||
} else {
|
err = gconv.Struct(userInfoHeader, &user)
|
||||||
|
if err != nil {
|
||||||
|
return user, gerror.Wrap(err, "请求头用户信息解析失败")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 从 token 解析
|
||||||
redisAddr := g.Cfg().MustGet(ctx, "redis.default.address").String()
|
redisAddr := g.Cfg().MustGet(ctx, "redis.default.address").String()
|
||||||
gft := gftoken.NewGfToken(
|
gft := gftoken.NewGfToken(
|
||||||
gftoken.WithCacheKey("gfToken:"),
|
gftoken.WithCacheKey("gfToken:"),
|
||||||
@@ -96,7 +106,7 @@ func GetUserInfo(ctx context.Context) (user beans.User, err error) {
|
|||||||
var tokenData *gftoken.TokenData
|
var tokenData *gftoken.TokenData
|
||||||
tokenData, _, err = gft.GetTokenData(ctx, ctx.Value("token").(string))
|
tokenData, _, err = gft.GetTokenData(ctx, ctx.Value("token").(string))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return user, gerror.Wrap(err, "token 解析失败")
|
return user, gerror.Wrap(err, "ctx token 解析失败")
|
||||||
}
|
}
|
||||||
var code int
|
var code int
|
||||||
if data, code = gft.IsNotExpired(tokenData.JwtToken); code != gftoken.JwtTokenOK {
|
if data, code = gft.IsNotExpired(tokenData.JwtToken); code != gftoken.JwtTokenOK {
|
||||||
@@ -116,17 +126,11 @@ func GetUserInfo(ctx context.Context) (user beans.User, err error) {
|
|||||||
}
|
}
|
||||||
// 检查 data.Data 是否为 nil
|
// 检查 data.Data 是否为 nil
|
||||||
if data.Data == nil {
|
if data.Data == nil {
|
||||||
g.Log().Errorf(ctx, "data.Data 为空")
|
|
||||||
return user, gerror.New("用户信息为空")
|
return user, gerror.New("用户信息为空")
|
||||||
}
|
}
|
||||||
dataMap := gconv.Map(data.Data)
|
err = gconv.Struct(data.Data, &user)
|
||||||
user.UserId = dataMap["id"]
|
if err != nil {
|
||||||
user.UserName = dataMap["userName"]
|
return user, gerror.Wrap(err, "用户信息转换失败")
|
||||||
user.TenantId = dataMap["tenantId"]
|
|
||||||
}
|
|
||||||
|
|
||||||
if g.IsNil(user.UserId) && g.IsNil(user.UserName) && g.IsNil(user.TenantId) {
|
|
||||||
return user, gerror.New("租户信息为空")
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user