Files
data-engine/common/report/config/loader.go
2026-06-11 13:06:54 +08:00

665 lines
18 KiB
Go

package config
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"
"dataengine/common/report/model"
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
)
// ConfigLoader 配置加载器
type ConfigLoader struct {
mu sync.RWMutex
// 缓存
businessCache map[string]*model.BusinessConfig
reportCache map[string]*model.ReportConfig
fieldCache map[string][]model.FieldConfig
extractCache map[string][]model.ExtractConfig
}
var (
defaultLoader *ConfigLoader
once sync.Once
)
// GetLoader 获取配置加载器单例
func GetLoader() *ConfigLoader {
once.Do(func() {
defaultLoader = &ConfigLoader{
businessCache: make(map[string]*model.BusinessConfig),
reportCache: make(map[string]*model.ReportConfig),
fieldCache: make(map[string][]model.FieldConfig),
extractCache: make(map[string][]model.ExtractConfig),
}
})
return defaultLoader
}
// GetBusiness 获取业务配置
func (l *ConfigLoader) GetBusiness(ctx context.Context, businessCode string) (*model.BusinessConfig, error) {
l.mu.RLock()
if biz, ok := l.businessCache[businessCode]; ok {
l.mu.RUnlock()
return biz, nil
}
l.mu.RUnlock()
var biz model.BusinessConfig
r, err := gfdb.DB(ctx).Model(ctx, "report_business_config").
Where("business_code", businessCode).
Where("status", model.StatusActive).
One()
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("业务配置不存在: %s", businessCode)
}
return nil, err
}
if r.IsEmpty() {
return nil, fmt.Errorf("业务配置不存在: %s", businessCode)
}
if err = r.Struct(&biz); err != nil {
return nil, err
}
if biz.Config == nil {
biz.Config = make(map[string]interface{})
}
l.mu.Lock()
l.businessCache[businessCode] = &biz
l.mu.Unlock()
return &biz, nil
}
// GetReport 获取报表配置
func (l *ConfigLoader) GetReport(ctx context.Context, businessCode, reportCode string) (*model.ReportConfig, error) {
key := businessCode + ":" + reportCode
l.mu.RLock()
if rpt, ok := l.reportCache[key]; ok {
l.mu.RUnlock()
return rpt, nil
}
l.mu.RUnlock()
var rpt model.ReportConfig
r, err := gfdb.DB(ctx).Model(ctx, "report_report_config").
Where("business_code", businessCode).
Where("report_code", reportCode).
Where("status", model.StatusActive).
One()
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("报表配置不存在: %s/%s", businessCode, reportCode)
}
return nil, err
}
if r.IsEmpty() {
return nil, fmt.Errorf("报表配置不存在: %s/%s", businessCode, reportCode)
}
if err = r.Struct(&rpt); err != nil {
return nil, err
}
if rpt.PrimaryKeys == nil {
rpt.PrimaryKeys = []string{"id"}
}
if rpt.ConflictKeys == nil {
rpt.ConflictKeys = []string{"stat_date"}
}
if rpt.Config == nil {
rpt.Config = make(map[string]interface{})
}
l.mu.Lock()
l.reportCache[key] = &rpt
l.mu.Unlock()
return &rpt, nil
}
// GetFields 获取报表字段配置
func (l *ConfigLoader) GetFields(ctx context.Context, businessCode, reportCode string) ([]model.FieldConfig, error) {
key := businessCode + ":" + reportCode
l.mu.RLock()
if fields, ok := l.fieldCache[key]; ok {
l.mu.RUnlock()
return fields, nil
}
l.mu.RUnlock()
var fields []model.FieldConfig
err := gfdb.DB(ctx).Model(ctx, "report_field_config").
Where("business_code", businessCode).
Where("report_code", reportCode).
Where("status", model.StatusActive).
Order("sort_order ASC").
Scan(&fields)
if err != nil {
return nil, err
}
for i := range fields {
if fields[i].ValidAggregates == nil {
fields[i].ValidAggregates = []string{}
}
if fields[i].FilterOperators == nil {
fields[i].FilterOperators = []string{"=", "!=", ">", "<", ">=", "<=", "IN", "LIKE", "BETWEEN"}
}
}
l.mu.Lock()
l.fieldCache[key] = fields
l.mu.Unlock()
return fields, nil
}
// GetFieldMap 获取字段配置Map
func (l *ConfigLoader) GetFieldMap(ctx context.Context, businessCode, reportCode string) (map[string]*model.FieldConfig, error) {
fields, err := l.GetFields(ctx, businessCode, reportCode)
if err != nil {
return nil, err
}
fieldMap := make(map[string]*model.FieldConfig)
for i := range fields {
fieldMap[fields[i].FieldCode] = &fields[i]
}
return fieldMap, nil
}
// GetExtractConfigs 获取抽取配置
func (l *ConfigLoader) GetExtractConfigs(ctx context.Context, businessCode, reportCode string) ([]model.ExtractConfig, error) {
key := businessCode + ":" + reportCode
l.mu.RLock()
if configs, ok := l.extractCache[key]; ok {
l.mu.RUnlock()
return configs, nil
}
l.mu.RUnlock()
var configs []model.ExtractConfig
err := gfdb.DB(ctx).Model(ctx, "report_extract_config").
Where("business_code", businessCode).
Where("report_code", reportCode).
Where("status", model.StatusActive).
Where("is_enabled", true).
Scan(&configs)
if err != nil {
return nil, err
}
for i := range configs {
if configs[i].JoinConfigs == nil {
configs[i].JoinConfigs = []model.JoinConfig{}
}
if configs[i].FieldMappings == nil {
configs[i].FieldMappings = []model.FieldMapping{}
}
if configs[i].TransformRules == nil {
configs[i].TransformRules = []model.TransformRule{}
}
if configs[i].GroupByFields == nil {
configs[i].GroupByFields = []string{}
}
if configs[i].ExtractMode == "" {
configs[i].ExtractMode = model.ExtractModeDirect
}
}
l.mu.Lock()
l.extractCache[key] = configs
l.mu.Unlock()
return configs, nil
}
// GetExtractLog 获取抽取记录
func (l *ConfigLoader) GetExtractLog(ctx context.Context, businessCode, reportCode, extractCode, statDate string) (*model.ExtractLog, error) {
var log model.ExtractLog
r, err := gfdb.DB(ctx).Model(ctx, "report_extract_log").
Where("business_code", businessCode).
Where("report_code", reportCode).
Where("extract_code", extractCode).
Where("stat_date", statDate).
One()
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
if r.IsEmpty() {
return nil, nil
}
if err = r.Struct(&log); err != nil {
return nil, err
}
return &log, nil
}
// CreateExtractLog 创建抽取记录
func (l *ConfigLoader) CreateExtractLog(ctx context.Context, log *model.ExtractLog) error {
data, _ := json.Marshal(log)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
_, err := gfdb.DB(ctx).Model(ctx, "report_extract_log").Data(m).Save()
return err
}
// UpdateExtractLog 更新抽取记录
func (l *ConfigLoader) UpdateExtractLog(ctx context.Context, log *model.ExtractLog) error {
data, _ := json.Marshal(log)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
_, err := gfdb.DB(ctx).Model(ctx, "report_extract_log").
Where("business_code", log.BusinessCode).
Where("report_code", log.ReportCode).
Where("extract_code", log.ExtractCode).
Where("stat_date", log.StatDate).
Data(m).
Update()
return err
}
// InvalidateCache 失效缓存
func (l *ConfigLoader) InvalidateCache(businessCode, reportCode string) {
l.mu.Lock()
delete(l.businessCache, businessCode)
delete(l.reportCache, businessCode+":"+reportCode)
delete(l.fieldCache, businessCode+":"+reportCode)
delete(l.extractCache, businessCode+":"+reportCode)
l.mu.Unlock()
}
// InvalidateBusinessCache 只失效业务缓存(不影响报表/字段)
func (l *ConfigLoader) InvalidateBusinessCache(businessCode string) {
l.mu.Lock()
delete(l.businessCache, businessCode)
l.mu.Unlock()
}
// ============================================================
// CRUD: BusinessConfig
// ============================================================
// CreateBusiness 创建业务配置
func (l *ConfigLoader) CreateBusiness(ctx context.Context, biz *model.BusinessConfig) (int64, error) {
data, _ := json.Marshal(biz)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "updated_at")
delete(m, "deleted_at")
result, err := gfdb.DB(ctx).Model(ctx, "report_business_config").Data(m).Insert()
if err != nil {
return 0, fmt.Errorf("创建业务配置失败: %w", err)
}
id, _ := result.LastInsertId()
l.InvalidateBusinessCache(biz.BusinessCode)
return id, nil
}
// UpdateBusiness 更新业务配置
func (l *ConfigLoader) UpdateBusiness(ctx context.Context, biz *model.BusinessConfig) error {
data, _ := json.Marshal(biz)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "deleted_at")
_, err := gfdb.DB(ctx).Model(ctx, "report_business_config").
Where("id", biz.ID).
Data(m).
Update()
if err != nil {
return fmt.Errorf("更新业务配置失败: %w", err)
}
l.InvalidateBusinessCache(biz.BusinessCode)
return nil
}
// DeleteBusiness 删除业务配置(软删除)
func (l *ConfigLoader) DeleteBusiness(ctx context.Context, id int64, businessCode string) error {
_, err := gfdb.DB(ctx).Model(ctx, "report_business_config").
Where("id", id).
Data(map[string]interface{}{
"status": model.StatusInactive,
"deleted_at": "NOW()",
}).
Update()
if err != nil {
return fmt.Errorf("删除业务配置失败: %w", err)
}
l.InvalidateBusinessCache(businessCode)
return nil
}
// GetBusinessByID 根据ID获取业务配置
func (l *ConfigLoader) GetBusinessByID(ctx context.Context, id int64) (*model.BusinessConfig, error) {
var biz model.BusinessConfig
r, err := gfdb.DB(ctx).Model(ctx, "report_business_config").
Where("id", id).
One()
if err != nil {
g.Log().Infof(ctx, "[GetBusinessByID] id=%d, err=%v", id, err)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("业务配置不存在: id=%d", id)
}
return nil, err
}
if r.IsEmpty() {
return nil, fmt.Errorf("业务配置不存在: id=%d", id)
}
if err = r.Struct(&biz); err != nil {
return nil, err
}
g.Log().Infof(ctx, "[GetBusinessByID] id=%d, biz.ID=%d, biz.BusinessCode=%s",
id, biz.ID, biz.BusinessCode)
return &biz, nil
}
// ============================================================
// CRUD: ReportConfig
// ============================================================
// CreateReport 创建报表配置
func (l *ConfigLoader) CreateReport(ctx context.Context, rpt *model.ReportConfig) (int64, error) {
data, _ := json.Marshal(rpt)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "updated_at")
delete(m, "deleted_at")
result, err := gfdb.DB(ctx).Model(ctx, "report_report_config").Data(m).Insert()
if err != nil {
return 0, fmt.Errorf("创建报表配置失败: %w", err)
}
id, _ := result.LastInsertId()
l.InvalidateCache(rpt.BusinessCode, rpt.ReportCode)
return id, nil
}
// UpdateReport 更新报表配置
func (l *ConfigLoader) UpdateReport(ctx context.Context, rpt *model.ReportConfig) error {
data, _ := json.Marshal(rpt)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "deleted_at")
_, err := gfdb.DB(ctx).Model(ctx, "report_report_config").
Where("id", rpt.ID).
Data(m).
Update()
if err != nil {
return fmt.Errorf("更新报表配置失败: %w", err)
}
l.InvalidateCache(rpt.BusinessCode, rpt.ReportCode)
return nil
}
// DeleteReport 删除报表配置(软删除)
func (l *ConfigLoader) DeleteReport(ctx context.Context, id int64, businessCode, reportCode string) error {
_, err := gfdb.DB(ctx).Model(ctx, "report_report_config").
Where("id", id).
Data(map[string]interface{}{
"status": model.StatusInactive,
"deleted_at": "NOW()",
}).
Update()
if err != nil {
return fmt.Errorf("删除报表配置失败: %w", err)
}
l.InvalidateCache(businessCode, reportCode)
return nil
}
// GetReportByID 根据ID获取报表配置
func (l *ConfigLoader) GetReportByID(ctx context.Context, id int64) (*model.ReportConfig, error) {
var rpt model.ReportConfig
r, err := gfdb.DB(ctx).Model(ctx, "report_report_config").
Where("id", id).
One()
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("报表配置不存在: id=%d", id)
}
return nil, err
}
if r.IsEmpty() {
return nil, fmt.Errorf("报表配置不存在: id=%d", id)
}
if err = r.Struct(&rpt); err != nil {
return nil, err
}
return &rpt, nil
}
// ============================================================
// CRUD: FieldConfig
// ============================================================
// CreateField 创建字段配置
func (l *ConfigLoader) CreateField(ctx context.Context, field *model.FieldConfig) (int64, error) {
data, _ := json.Marshal(field)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "updated_at")
delete(m, "deleted_at")
result, err := gfdb.DB(ctx).Model(ctx, "report_field_config").Data(m).Insert()
if err != nil {
return 0, fmt.Errorf("创建字段配置失败: %w", err)
}
id, _ := result.LastInsertId()
l.InvalidateCache(field.BusinessCode, field.ReportCode)
return id, nil
}
// UpdateField 更新字段配置
func (l *ConfigLoader) UpdateField(ctx context.Context, field *model.FieldConfig) error {
data, _ := json.Marshal(field)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "deleted_at")
_, err := gfdb.DB(ctx).Model(ctx, "report_field_config").
Where("id", field.ID).
Data(m).
Update()
if err != nil {
return fmt.Errorf("更新字段配置失败: %w", err)
}
l.InvalidateCache(field.BusinessCode, field.ReportCode)
return nil
}
// DeleteField 删除字段配置(软删除)
func (l *ConfigLoader) DeleteField(ctx context.Context, id int64, businessCode, reportCode string) error {
_, err := gfdb.DB(ctx).Model(ctx, "report_field_config").
Where("id", id).
Data(map[string]interface{}{
"status": model.StatusInactive,
"deleted_at": "NOW()",
}).
Update()
if err != nil {
return fmt.Errorf("删除字段配置失败: %w", err)
}
l.InvalidateCache(businessCode, reportCode)
return nil
}
// GetFieldByID 根据ID获取字段配置
func (l *ConfigLoader) GetFieldByID(ctx context.Context, id int64) (*model.FieldConfig, error) {
var field model.FieldConfig
r, err := gfdb.DB(ctx).Model(ctx, "report_field_config").
Where("id", id).
One()
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("字段配置不存在: id=%d", id)
}
return nil, err
}
if r.IsEmpty() {
return nil, fmt.Errorf("字段配置不存在: id=%d", id)
}
if err = r.Struct(&field); err != nil {
return nil, err
}
return &field, nil
}
// ============================================================
// CRUD: ExtractConfig
// ============================================================
// CreateExtractConfig 创建抽取配置
func (l *ConfigLoader) CreateExtractConfig(ctx context.Context, ec *model.ExtractConfig) (int64, error) {
data, _ := json.Marshal(ec)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "updated_at")
delete(m, "deleted_at")
result, err := gfdb.DB(ctx).Model(ctx, "report_extract_config").Data(m).Insert()
if err != nil {
return 0, fmt.Errorf("创建抽取配置失败: %w", err)
}
id, _ := result.LastInsertId()
l.InvalidateCache(ec.BusinessCode, ec.ReportCode)
return id, nil
}
// UpdateExtractConfig 更新抽取配置
func (l *ConfigLoader) UpdateExtractConfig(ctx context.Context, ec *model.ExtractConfig) error {
data, _ := json.Marshal(ec)
var m map[string]interface{}
json.Unmarshal(data, &m)
delete(m, "id")
delete(m, "created_at")
delete(m, "deleted_at")
_, err := gfdb.DB(ctx).Model(ctx, "report_extract_config").
Where("id", ec.ID).
Data(m).
Update()
if err != nil {
return fmt.Errorf("更新抽取配置失败: %w", err)
}
l.InvalidateCache(ec.BusinessCode, ec.ReportCode)
return nil
}
// DeleteExtractConfig 删除抽取配置(软删除)
func (l *ConfigLoader) DeleteExtractConfig(ctx context.Context, id int64, businessCode, reportCode string) error {
_, err := gfdb.DB(ctx).Model(ctx, "report_extract_config").
Where("id", id).
Data(map[string]interface{}{
"status": model.StatusInactive,
"deleted_at": "NOW()",
}).
Update()
if err != nil {
return fmt.Errorf("删除抽取配置失败: %w", err)
}
l.InvalidateCache(businessCode, reportCode)
return nil
}
// GetExtractConfigByID 根据ID获取抽取配置
func (l *ConfigLoader) GetExtractConfigByID(ctx context.Context, id int64) (*model.ExtractConfig, error) {
var ec model.ExtractConfig
r, err := gfdb.DB(ctx).Model(ctx, "report_extract_config").
Where("id", id).
One()
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("抽取配置不存在: id=%d", id)
}
return nil, err
}
if r.IsEmpty() {
return nil, fmt.Errorf("抽取配置不存在: id=%d", id)
}
if err = r.Struct(&ec); err != nil {
return nil, err
}
return &ec, nil
}
// GetAllBusinesses 获取所有业务配置
func (l *ConfigLoader) GetAllBusinesses(ctx context.Context) ([]model.BusinessConfig, error) {
var businesses []model.BusinessConfig
err := gfdb.DB(ctx).Model(ctx, "report_business_config").
Where("status", model.StatusActive).
Order("id ASC").
Scan(&businesses)
return businesses, err
}
// GetAllReports 获取所有报表配置
func (l *ConfigLoader) GetAllReports(ctx context.Context, businessCode string) ([]model.ReportConfig, error) {
var reports []model.ReportConfig
err := gfdb.DB(ctx).Model(ctx, "report_report_config").
Where("business_code", businessCode).
Where("status", model.StatusActive).
Order("id ASC").
Scan(&reports)
return reports, err
}
// GetReportFields 获取报表可用字段(按角色分类)
func (l *ConfigLoader) GetReportFields(ctx context.Context, businessCode, reportCode string) (*model.GetReportFieldsResp, error) {
fields, err := l.GetFields(ctx, businessCode, reportCode)
if err != nil {
return nil, err
}
resp := &model.GetReportFieldsResp{
BusinessCode: businessCode,
ReportCode: reportCode,
Dimensions: []model.FieldConfig{},
Indicators: []model.FieldConfig{},
Filters: []model.FieldConfig{},
}
for _, f := range fields {
switch f.FieldRole {
case model.RoleDimension:
resp.Dimensions = append(resp.Dimensions, f)
case model.RoleIndicator:
resp.Indicators = append(resp.Indicators, f)
case model.RoleFilter, model.RoleFilterOnly:
resp.Filters = append(resp.Filters, f)
}
}
return resp, nil
}