Files
data-engine/common/report/extract/extract.go

645 lines
19 KiB
Go
Raw Normal View History

2026-06-11 13:06:54 +08:00
package extract
import (
"context"
"fmt"
"strings"
"time"
"dataengine/common/report/config"
"dataengine/common/report/model"
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/util/gconv"
"github.com/sirupsen/logrus"
)
// DailyExtractor 天级数据抽取器
type DailyExtractor struct {
loader *config.ConfigLoader
}
// NewDailyExtractor 创建抽取器
func NewDailyExtractor() *DailyExtractor {
return &DailyExtractor{
loader: config.GetLoader(),
}
}
// ExtractDailyData 按天抽取数据(业务层定时任务调用)
func (e *DailyExtractor) ExtractDailyData(ctx context.Context, businessCode, reportCode, statDate, executor string) (*model.ExtractDailyDataResp, error) {
start := time.Now()
logger := logrus.WithFields(logrus.Fields{
"businessCode": businessCode,
"reportCode": reportCode,
"statDate": statDate,
})
// 1. 获取报表配置
report, err := e.loader.GetReport(ctx, businessCode, reportCode)
if err != nil {
return nil, fmt.Errorf("获取报表配置失败: %w", err)
}
// 2. 获取抽取配置
extractConfigs, err := e.loader.GetExtractConfigs(ctx, businessCode, reportCode)
if err != nil {
return nil, fmt.Errorf("获取抽取配置失败: %w", err)
}
if len(extractConfigs) == 0 {
return nil, fmt.Errorf("没有可用的抽取配置")
}
// 3. 获取字段配置
fieldMap, err := e.loader.GetFieldMap(ctx, businessCode, reportCode)
if err != nil {
return nil, fmt.Errorf("获取字段配置失败: %w", err)
}
// 4. 确保统计宽表存在
if err := e.ensureStatTableExists(ctx, report, fieldMap); err != nil {
return nil, fmt.Errorf("确保统计宽表存在失败: %w", err)
}
totalCount := 0
successCount := 0
failCount := 0
var lastErr error
// 5. 遍历每个抽取配置
for _, ec := range extractConfigs {
// 检查幂等性
exLog, err := e.loader.GetExtractLog(ctx, businessCode, reportCode, ec.ExtractCode, statDate)
if err != nil {
logger.Errorf("获取抽取记录失败: %v", err)
}
if exLog != nil && exLog.Status == model.ExtractStatusSuccess {
logger.Infof("抽取配置 %s 日期 %s 已完成,跳过", ec.ExtractCode, statDate)
continue
}
// 创建抽取记录
extractLog := &model.ExtractLog{
BusinessCode: businessCode,
ReportCode: reportCode,
ExtractCode: ec.ExtractCode,
StatDate: statDate,
ExtractType: ec.ExtractType,
Status: model.ExtractStatusRunning,
Executor: executor,
StartTime: &start,
}
_ = e.loader.CreateExtractLog(ctx, extractLog)
// 执行抽取
c, s, f, err := e.executeExtract(ctx, &ec, report, fieldMap, statDate)
totalCount += c
successCount += s
failCount += f
// 更新抽取记录
now := time.Now()
extractLog.EndTime = &now
extractLog.TotalCount = c
extractLog.SuccessCount = s
extractLog.FailCount = f
if err != nil {
extractLog.Status = model.ExtractStatusFailed
extractLog.ErrorMessage = err.Error()
lastErr = err
logger.Errorf("抽取配置 %s 执行失败: %v", ec.ExtractCode, err)
} else {
extractLog.Status = model.ExtractStatusSuccess
logger.Infof("抽取配置 %s 完成, 总数:%d 成功:%d 失败:%d", ec.ExtractCode, c, s, f)
}
if updateErr := e.loader.UpdateExtractLog(ctx, extractLog); updateErr != nil {
logger.Errorf("更新抽取记录失败: %v", updateErr)
}
}
execTime := time.Since(start).Milliseconds()
logger.Infof("按天抽取完成, 总数:%d 成功:%d 失败:%d 耗时:%dms", totalCount, successCount, failCount, execTime)
resp := &model.ExtractDailyDataResp{
Success: lastErr == nil,
TotalCount: totalCount,
SuccessCount: successCount,
FailCount: failCount,
ExecTimeMs: execTime,
}
if lastErr != nil {
resp.ErrorMsg = lastErr.Error()
}
return resp, nil
}
// executeExtract 执行单个抽取配置
func (e *DailyExtractor) executeExtract(ctx context.Context, ec *model.ExtractConfig, report *model.ReportConfig, fieldMap map[string]*model.FieldConfig, statDate string) (total, success, fail int, err error) {
logger := logrus.WithField("extractCode", ec.ExtractCode)
// 1. 构建抽取SQL
extractSQL, whereArgs, err := e.buildExtractSQL(ctx, ec, report, statDate)
if err != nil {
return 0, 0, 0, fmt.Errorf("构建抽取SQL失败: %w", err)
}
logger.Debugf("抽取SQL: %s", extractSQL)
// 2. 分批抽取
batchSize := ec.BatchSize
if batchSize <= 0 {
batchSize = 1000
}
offset := 0
for {
// 添加分页
pagedSQL := fmt.Sprintf("%s LIMIT %d OFFSET %d", extractSQL, batchSize, offset)
args := append(whereArgs)
rows, queryErr := gfdb.DB(ctx).GetAll(ctx, pagedSQL, args...)
if queryErr != nil {
return total, success, fail, fmt.Errorf("抽取查询失败: %w", queryErr)
}
batchCount := rows.Len()
if batchCount == 0 {
break
}
// 3. 应用转换规则(仅 DIRECT 模式需注入审计字段AGGREGATE 模式已由SQL处理
dataList := rows.List()
if ec.ExtractMode != model.ExtractModeAggregate {
for i := range dataList {
e.applyTransformRules(ec, dataList[i])
dataList[i]["tenant_id"] = 1
dataList[i]["business_code"] = ec.BusinessCode
}
}
// 4. 写入统计宽表
c, _, writeErr := e.batchUpsert(ctx, report.StatTableName, report.ConflictKeys, dataList)
if writeErr != nil {
logger.Errorf("批量写入失败 (offset=%d): %v", offset, writeErr)
fail += batchCount
} else {
success += c
}
total += batchCount
offset += batchSize
if batchCount < batchSize {
break
}
}
return total, success, fail, nil
}
// buildExtractSQL 构建抽取SQL
func (e *DailyExtractor) buildExtractSQL(ctx context.Context, ec *model.ExtractConfig, report *model.ReportConfig, statDate string) (string, []interface{}, error) {
var args []interface{}
sourceTable := ec.SourceTableName
if ec.SourceTableAlias != "" {
sourceTable = ec.SourceTableAlias
} else {
sourceTable = "s"
}
// 日期字段
dateField := report.DateField
if dateField == "" {
dateField = "stat_date"
}
// 判断抽取模式
mode := ec.ExtractMode
if mode == "" {
mode = model.ExtractModeDirect
}
if mode == model.ExtractModeAggregate {
return e.buildAggregateExtractSQL(ec, report, sourceTable, dateField, statDate)
}
// === 默认 DIRECT 模式:逐行抽取 ===
return e.buildDirectExtractSQL(ec, report, sourceTable, dateField, statDate), args, nil
}
// buildDirectExtractSQL 逐行抽取模式SQL直接映射不做聚合
func (e *DailyExtractor) buildDirectExtractSQL(ec *model.ExtractConfig, report *model.ReportConfig, sourceTable, dateField, statDate string) string {
var selectParts []string
// 基础审计字段(常量注入)
selectParts = append(selectParts, "0 AS id")
selectParts = append(selectParts, "1 AS tenant_id")
selectParts = append(selectParts, fmt.Sprintf("'%s' AS business_code", ec.BusinessCode))
selectParts = append(selectParts, "'system' AS creator")
selectParts = append(selectParts, "NOW() AS created_at")
selectParts = append(selectParts, "'system' AS updater")
selectParts = append(selectParts, "NOW() AS updated_at")
selectParts = append(selectParts, "NULL::TIMESTAMP AS deleted_at")
// 日期字段
selectParts = append(selectParts, fmt.Sprintf("'%s' AS %s", statDate, dateField))
// 原始数据
selectParts = append(selectParts, "'{}'::JSONB AS raw_data")
// 字段映射
for _, mapping := range ec.FieldMappings {
targetField := mapping.TargetField
sourceField := mapping.SourceField
var expr string
if mapping.TransformRule != nil {
expr = e.applyTransformExpr(mapping.TransformRule, fmt.Sprintf("%s.%s", sourceTable, sourceField))
} else {
expr = fmt.Sprintf("%s.%s", sourceTable, sourceField)
}
if mapping.DefaultValue != nil {
expr = fmt.Sprintf("COALESCE(%s, '%v')", expr, mapping.DefaultValue)
}
selectParts = append(selectParts, fmt.Sprintf("%s AS %s", expr, targetField))
}
// FROM + JOIN
fromClause := e.buildFromClause(ec, sourceTable)
// JOIN 字段映射
selectParts = append(selectParts, e.buildJoinFieldSelects(ec)...)
// WHERE
whereClause := e.buildWhereClause(ec, sourceTable, statDate)
return fmt.Sprintf("SELECT %s FROM %s %s", strings.Join(selectParts, ", "), fromClause, whereClause)
}
// buildAggregateExtractSQL 聚合抽取模式SQLGROUP BY + SUM/COUNT/AVG
func (e *DailyExtractor) buildAggregateExtractSQL(ec *model.ExtractConfig, report *model.ReportConfig, sourceTable, dateField, statDate string) (string, []interface{}, error) {
var selectParts []string
var groupByParts []string
var args []interface{}
// 基础审计字段(聚合模式下用常量)
selectParts = append(selectParts, "ROW_NUMBER() OVER () AS id") // 伪自增ID
selectParts = append(selectParts, "1 AS tenant_id")
selectParts = append(selectParts, fmt.Sprintf("'%s' AS business_code", ec.BusinessCode))
selectParts = append(selectParts, "'system' AS creator")
selectParts = append(selectParts, "NOW() AS created_at")
selectParts = append(selectParts, "'system' AS updater")
selectParts = append(selectParts, "NOW() AS updated_at")
selectParts = append(selectParts, "NULL::TIMESTAMP AS deleted_at")
// 日期字段(常量)
selectParts = append(selectParts, fmt.Sprintf("'%s' AS %s", statDate, dateField))
// 原始数据
selectParts = append(selectParts, "'{}'::JSONB AS raw_data")
// GroupByFields 集合(快速查找)
gbySet := make(map[string]bool)
for _, gbf := range ec.GroupByFields {
gbySet[gbf] = true
}
// 添加 GroupBy 字段到 SELECT 和 GROUP BY
for _, gbf := range ec.GroupByFields {
selectParts = append(selectParts, fmt.Sprintf("%s.%s", sourceTable, gbf))
groupByParts = append(groupByParts, fmt.Sprintf("%s.%s", sourceTable, gbf))
}
// 字段映射:根据 AggregateFunction 决定聚合方式
for _, mapping := range ec.FieldMappings {
targetField := mapping.TargetField
sourceField := mapping.SourceField
// 构建源表达式
var sourceExpr string
if mapping.TransformRule != nil {
sourceExpr = e.applyTransformExpr(mapping.TransformRule, fmt.Sprintf("%s.%s", sourceTable, sourceField))
} else {
sourceExpr = fmt.Sprintf("%s.%s", sourceTable, sourceField)
}
// 判断是否需要聚合
aggFunc := strings.ToUpper(mapping.AggregateFunction)
if aggFunc != "" && !gbySet[sourceField] {
// 聚合字段SUM(s.xxx) / COUNT(s.xxx) / AVG(s.xxx)
expr := fmt.Sprintf("%s(%s)", aggFunc, sourceExpr)
if mapping.DefaultValue != nil {
expr = fmt.Sprintf("COALESCE(%s, %v)", expr, mapping.DefaultValue)
}
selectParts = append(selectParts, fmt.Sprintf("%s AS %s", expr, targetField))
} else if gbySet[sourceField] {
// GroupBy 字段不需要重复加入 SELECT已通过 groupByFields 处理)
continue
} else {
// 非聚合字段,也未在 GroupBy 中 → 用 MAX/MIN 取值(兼容 PG only_full_group_by
expr := fmt.Sprintf("MAX(%s)", sourceExpr)
if mapping.DefaultValue != nil {
expr = fmt.Sprintf("COALESCE(%s, %v)", expr, mapping.DefaultValue)
}
selectParts = append(selectParts, fmt.Sprintf("%s AS %s", expr, targetField))
}
}
// FROM + JOIN
fromClause := e.buildFromClause(ec, sourceTable)
// WHERE
whereClause := e.buildWhereClause(ec, sourceTable, statDate)
// 组合 SQL
sql := fmt.Sprintf("SELECT %s FROM %s %s",
strings.Join(selectParts, ", "),
fromClause,
whereClause)
// GROUP BY
if len(groupByParts) > 0 {
sql += " GROUP BY " + strings.Join(groupByParts, ", ")
}
return sql, args, nil
}
// buildFromClause 构建FROM + JOIN子句
func (e *DailyExtractor) buildFromClause(ec *model.ExtractConfig, sourceTable string) string {
fromClause := fmt.Sprintf("%s %s", ec.SourceTableName, sourceTable)
for _, join := range ec.JoinConfigs {
joinType := "LEFT JOIN"
jType := strings.ToUpper(join.JoinType)
if jType == "INNER" {
joinType = "INNER JOIN"
} else if jType == "RIGHT" {
joinType = "RIGHT JOIN"
}
joinAlias := join.JoinAlias
if joinAlias == "" {
joinAlias = join.JoinTable
}
fromClause += fmt.Sprintf(" %s %s %s ON %s", joinType, join.JoinTable, joinAlias, join.JoinCondition)
}
return fromClause
}
// buildJoinFieldSelects 构建JOIN表的字段映射SELECT部分
func (e *DailyExtractor) buildJoinFieldSelects(ec *model.ExtractConfig) []string {
var parts []string
for _, join := range ec.JoinConfigs {
joinAlias := join.JoinAlias
if joinAlias == "" {
joinAlias = join.JoinTable
}
for _, jm := range join.FieldMappings {
targetField := jm.TargetField
sourceExpr := fmt.Sprintf("%s.%s", joinAlias, jm.SourceField)
if jm.TransformRule != nil {
sourceExpr = e.applyTransformExpr(jm.TransformRule, sourceExpr)
}
parts = append(parts, fmt.Sprintf("%s AS %s", sourceExpr, targetField))
}
}
return parts
}
// buildWhereClause 构建WHERE子句
func (e *DailyExtractor) buildWhereClause(ec *model.ExtractConfig, sourceTable, statDate string) string {
var whereConditions []string
// 日期范围(增量抽取)
if ec.ExtractType == model.ExtractTypeIncremental && ec.ExtractKeyField != "" {
dateCondition := fmt.Sprintf("%s.%s::date = '%s'", sourceTable, ec.ExtractKeyField, statDate)
whereConditions = append(whereConditions, dateCondition)
}
// 自定义过滤条件
if ec.FilterExpression != "" {
whereConditions = append(whereConditions, ec.FilterExpression)
}
if len(whereConditions) == 0 {
return ""
}
return "WHERE " + strings.Join(whereConditions, " AND ")
}
// applyTransformExpr 应用转换表达式
func (e *DailyExtractor) applyTransformExpr(rule *model.TransformRule, sourceExpr string) string {
switch rule.RuleType {
case "CALCULATE":
if rule.Expression != "" {
return strings.ReplaceAll(rule.Expression, "{source}", sourceExpr)
}
case "FORMAT":
if rule.Format != "" {
return fmt.Sprintf("TO_CHAR(%s, '%s')", sourceExpr, rule.Format)
}
case "MAPPING":
// 在代码中运行时做映射
return sourceExpr
}
return sourceExpr
}
// applyTransformRules 应用运行时转换规则(映射等代码转换)
func (e *DailyExtractor) applyTransformRules(ec *model.ExtractConfig, row map[string]interface{}) {
for _, rule := range ec.TransformRules {
if rule.RuleType != "MAPPING" {
continue
}
sourceField := rule.Expression // 存储源字段名
targetField := rule.RuleCode // 存储目标字段名
if sourceVal, ok := row[sourceField]; ok {
strVal := gconv.String(sourceVal)
if mapped, exists := rule.Mapping[strVal]; exists {
row[targetField] = mapped
}
}
}
}
// ensureStatTableExists 确保统计宽表存在
func (e *DailyExtractor) ensureStatTableExists(ctx context.Context, report *model.ReportConfig, fieldMap map[string]*model.FieldConfig) error {
tableName := report.StatTableName
// 检查表是否存在
result, err := gfdb.DB(ctx).GetAll(ctx, "SELECT COUNT(*) FROM pg_tables WHERE tablename = $1", strings.ToLower(tableName))
if err != nil {
return err
}
count := 0
if len(result) > 0 {
count = result[0]["count"].Int()
}
if count == 0 {
// 需要建表
return e.createStatTable(ctx, report, fieldMap)
}
logrus.Infof("统计宽表 %s 已存在", tableName)
return nil
}
// createStatTable 创建统计宽表
func (e *DailyExtractor) createStatTable(ctx context.Context, report *model.ReportConfig, fieldMap map[string]*model.FieldConfig) error {
var cols []string
// 标准审计字段
cols = append(cols, "id BIGSERIAL PRIMARY KEY")
cols = append(cols, "tenant_id BIGINT NOT NULL DEFAULT 0")
cols = append(cols, "business_code VARCHAR(64) NOT NULL DEFAULT ''")
cols = append(cols, "creator VARCHAR(64) DEFAULT ''")
cols = append(cols, "created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()")
cols = append(cols, "updater VARCHAR(64) DEFAULT ''")
cols = append(cols, "updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()")
cols = append(cols, "deleted_at TIMESTAMP WITH TIME ZONE")
// 日期字段
dateField := report.DateField
if dateField == "" {
dateField = "stat_date"
}
cols = append(cols, fmt.Sprintf("%s VARCHAR(16) NOT NULL DEFAULT ''", dateField))
// 业务字段
for _, fc := range fieldMap {
fc := fc
colType := fieldTypeToPG(fc.FieldType)
cols = append(cols, fmt.Sprintf("%s %s", fc.FieldCode, colType))
}
// 原始数据
cols = append(cols, "raw_data JSONB DEFAULT '{}'")
tableName := report.StatTableName
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (\n %s\n)", tableName, strings.Join(cols, ",\n "))
logrus.Infof("创建统计宽表: %s", tableName)
if _, err := gfdb.DB(ctx).Exec(ctx, sql); err != nil {
return fmt.Errorf("建表失败: %w", err)
}
// 冲突唯一索引
if len(report.ConflictKeys) > 0 {
indexName := fmt.Sprintf("uq_%s_conflict", tableName)
indexCols := strings.Join(report.ConflictKeys, ", ")
indexSQL := fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s)", indexName, tableName, indexCols)
if _, err := gfdb.DB(ctx).Exec(ctx, indexSQL); err != nil {
logrus.Warnf("创建冲突索引失败: %v", err)
}
}
// 字段注释
for _, fc := range fieldMap {
fc := fc
if fc.FieldName != "" {
escaped := strings.ReplaceAll(fc.FieldName, "'", "''")
commentSQL := fmt.Sprintf("COMMENT ON COLUMN %s.%s IS '%s'", tableName, fc.FieldCode, escaped)
if _, err := gfdb.DB(ctx).Exec(ctx, commentSQL); err != nil {
logrus.Warnf("添加字段注释失败 [%s.%s]: %v", tableName, fc.FieldCode, err)
}
}
}
return nil
}
// batchUpsert 批量upsert写入
func (e *DailyExtractor) batchUpsert(ctx context.Context, tableName string, conflictKeys []string, rows []map[string]interface{}) (int, []string, error) {
if len(rows) == 0 {
return 0, nil, nil
}
now := time.Now()
for i := range rows {
if rows[i] == nil {
rows[i] = make(map[string]interface{})
}
rows[i]["updated_at"] = now
}
batchSize := 100
total := 0
var allColumns []string
for i := 0; i < len(rows); i += batchSize {
end := i + batchSize
if end > len(rows) {
end = len(rows)
}
batch := rows[i:end]
m := gfdb.DB(ctx).Model(ctx, tableName).Data(batch)
if len(conflictKeys) > 0 {
keys := make([]interface{}, len(conflictKeys))
for j, k := range conflictKeys {
keys[j] = k
}
m = m.OnConflict(keys...)
}
_, err := m.Save()
if err != nil {
logrus.Errorf("批量写入 %s 失败: %v", tableName, err)
// 逐条重试
for _, row := range batch {
mm := gfdb.DB(ctx).Model(ctx, tableName).Data(row)
if len(conflictKeys) > 0 {
keys := make([]interface{}, len(conflictKeys))
for j, k := range conflictKeys {
keys[j] = k
}
mm = mm.OnConflict(keys...)
}
if _, e := mm.Save(); e != nil {
logrus.Errorf("逐条写入失败: %v", e)
} else {
total++
}
}
} else {
total += len(batch)
}
}
return total, allColumns, nil
}
// fieldTypeToPG 字段类型转PG类型
func fieldTypeToPG(fieldType string) string {
switch fieldType {
case model.FieldTypeInt:
return "NUMERIC(20,0) DEFAULT 0"
case model.FieldTypeFloat:
return "NUMERIC(20,4) DEFAULT 0"
case model.FieldTypeDate:
return "VARCHAR(16) DEFAULT ''"
case model.FieldTypeDatetime:
return "TIMESTAMP WITH TIME ZONE"
case model.FieldTypeJsonb:
return "JSONB DEFAULT '{}'"
default:
return "VARCHAR(256) DEFAULT ''"
}
}