Files
data-engine/common/report/extract/extract.go
2026-06-11 13:06:54 +08:00

645 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package extract
import (
"context"
"fmt"
"strings"
"time"
"dataengine/common/report/config"
"dataengine/common/report/model"
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/util/gconv"
"github.com/sirupsen/logrus"
)
// DailyExtractor 天级数据抽取器
type DailyExtractor struct {
loader *config.ConfigLoader
}
// NewDailyExtractor 创建抽取器
func NewDailyExtractor() *DailyExtractor {
return &DailyExtractor{
loader: config.GetLoader(),
}
}
// ExtractDailyData 按天抽取数据(业务层定时任务调用)
func (e *DailyExtractor) ExtractDailyData(ctx context.Context, businessCode, reportCode, statDate, executor string) (*model.ExtractDailyDataResp, error) {
start := time.Now()
logger := logrus.WithFields(logrus.Fields{
"businessCode": businessCode,
"reportCode": reportCode,
"statDate": statDate,
})
// 1. 获取报表配置
report, err := e.loader.GetReport(ctx, businessCode, reportCode)
if err != nil {
return nil, fmt.Errorf("获取报表配置失败: %w", err)
}
// 2. 获取抽取配置
extractConfigs, err := e.loader.GetExtractConfigs(ctx, businessCode, reportCode)
if err != nil {
return nil, fmt.Errorf("获取抽取配置失败: %w", err)
}
if len(extractConfigs) == 0 {
return nil, fmt.Errorf("没有可用的抽取配置")
}
// 3. 获取字段配置
fieldMap, err := e.loader.GetFieldMap(ctx, businessCode, reportCode)
if err != nil {
return nil, fmt.Errorf("获取字段配置失败: %w", err)
}
// 4. 确保统计宽表存在
if err := e.ensureStatTableExists(ctx, report, fieldMap); err != nil {
return nil, fmt.Errorf("确保统计宽表存在失败: %w", err)
}
totalCount := 0
successCount := 0
failCount := 0
var lastErr error
// 5. 遍历每个抽取配置
for _, ec := range extractConfigs {
// 检查幂等性
exLog, err := e.loader.GetExtractLog(ctx, businessCode, reportCode, ec.ExtractCode, statDate)
if err != nil {
logger.Errorf("获取抽取记录失败: %v", err)
}
if exLog != nil && exLog.Status == model.ExtractStatusSuccess {
logger.Infof("抽取配置 %s 日期 %s 已完成,跳过", ec.ExtractCode, statDate)
continue
}
// 创建抽取记录
extractLog := &model.ExtractLog{
BusinessCode: businessCode,
ReportCode: reportCode,
ExtractCode: ec.ExtractCode,
StatDate: statDate,
ExtractType: ec.ExtractType,
Status: model.ExtractStatusRunning,
Executor: executor,
StartTime: &start,
}
_ = e.loader.CreateExtractLog(ctx, extractLog)
// 执行抽取
c, s, f, err := e.executeExtract(ctx, &ec, report, fieldMap, statDate)
totalCount += c
successCount += s
failCount += f
// 更新抽取记录
now := time.Now()
extractLog.EndTime = &now
extractLog.TotalCount = c
extractLog.SuccessCount = s
extractLog.FailCount = f
if err != nil {
extractLog.Status = model.ExtractStatusFailed
extractLog.ErrorMessage = err.Error()
lastErr = err
logger.Errorf("抽取配置 %s 执行失败: %v", ec.ExtractCode, err)
} else {
extractLog.Status = model.ExtractStatusSuccess
logger.Infof("抽取配置 %s 完成, 总数:%d 成功:%d 失败:%d", ec.ExtractCode, c, s, f)
}
if updateErr := e.loader.UpdateExtractLog(ctx, extractLog); updateErr != nil {
logger.Errorf("更新抽取记录失败: %v", updateErr)
}
}
execTime := time.Since(start).Milliseconds()
logger.Infof("按天抽取完成, 总数:%d 成功:%d 失败:%d 耗时:%dms", totalCount, successCount, failCount, execTime)
resp := &model.ExtractDailyDataResp{
Success: lastErr == nil,
TotalCount: totalCount,
SuccessCount: successCount,
FailCount: failCount,
ExecTimeMs: execTime,
}
if lastErr != nil {
resp.ErrorMsg = lastErr.Error()
}
return resp, nil
}
// executeExtract 执行单个抽取配置
func (e *DailyExtractor) executeExtract(ctx context.Context, ec *model.ExtractConfig, report *model.ReportConfig, fieldMap map[string]*model.FieldConfig, statDate string) (total, success, fail int, err error) {
logger := logrus.WithField("extractCode", ec.ExtractCode)
// 1. 构建抽取SQL
extractSQL, whereArgs, err := e.buildExtractSQL(ctx, ec, report, statDate)
if err != nil {
return 0, 0, 0, fmt.Errorf("构建抽取SQL失败: %w", err)
}
logger.Debugf("抽取SQL: %s", extractSQL)
// 2. 分批抽取
batchSize := ec.BatchSize
if batchSize <= 0 {
batchSize = 1000
}
offset := 0
for {
// 添加分页
pagedSQL := fmt.Sprintf("%s LIMIT %d OFFSET %d", extractSQL, batchSize, offset)
args := append(whereArgs)
rows, queryErr := gfdb.DB(ctx).GetAll(ctx, pagedSQL, args...)
if queryErr != nil {
return total, success, fail, fmt.Errorf("抽取查询失败: %w", queryErr)
}
batchCount := rows.Len()
if batchCount == 0 {
break
}
// 3. 应用转换规则(仅 DIRECT 模式需注入审计字段AGGREGATE 模式已由SQL处理
dataList := rows.List()
if ec.ExtractMode != model.ExtractModeAggregate {
for i := range dataList {
e.applyTransformRules(ec, dataList[i])
dataList[i]["tenant_id"] = 1
dataList[i]["business_code"] = ec.BusinessCode
}
}
// 4. 写入统计宽表
c, _, writeErr := e.batchUpsert(ctx, report.StatTableName, report.ConflictKeys, dataList)
if writeErr != nil {
logger.Errorf("批量写入失败 (offset=%d): %v", offset, writeErr)
fail += batchCount
} else {
success += c
}
total += batchCount
offset += batchSize
if batchCount < batchSize {
break
}
}
return total, success, fail, nil
}
// buildExtractSQL 构建抽取SQL
func (e *DailyExtractor) buildExtractSQL(ctx context.Context, ec *model.ExtractConfig, report *model.ReportConfig, statDate string) (string, []interface{}, error) {
var args []interface{}
sourceTable := ec.SourceTableName
if ec.SourceTableAlias != "" {
sourceTable = ec.SourceTableAlias
} else {
sourceTable = "s"
}
// 日期字段
dateField := report.DateField
if dateField == "" {
dateField = "stat_date"
}
// 判断抽取模式
mode := ec.ExtractMode
if mode == "" {
mode = model.ExtractModeDirect
}
if mode == model.ExtractModeAggregate {
return e.buildAggregateExtractSQL(ec, report, sourceTable, dateField, statDate)
}
// === 默认 DIRECT 模式:逐行抽取 ===
return e.buildDirectExtractSQL(ec, report, sourceTable, dateField, statDate), args, nil
}
// buildDirectExtractSQL 逐行抽取模式SQL直接映射不做聚合
func (e *DailyExtractor) buildDirectExtractSQL(ec *model.ExtractConfig, report *model.ReportConfig, sourceTable, dateField, statDate string) string {
var selectParts []string
// 基础审计字段(常量注入)
selectParts = append(selectParts, "0 AS id")
selectParts = append(selectParts, "1 AS tenant_id")
selectParts = append(selectParts, fmt.Sprintf("'%s' AS business_code", ec.BusinessCode))
selectParts = append(selectParts, "'system' AS creator")
selectParts = append(selectParts, "NOW() AS created_at")
selectParts = append(selectParts, "'system' AS updater")
selectParts = append(selectParts, "NOW() AS updated_at")
selectParts = append(selectParts, "NULL::TIMESTAMP AS deleted_at")
// 日期字段
selectParts = append(selectParts, fmt.Sprintf("'%s' AS %s", statDate, dateField))
// 原始数据
selectParts = append(selectParts, "'{}'::JSONB AS raw_data")
// 字段映射
for _, mapping := range ec.FieldMappings {
targetField := mapping.TargetField
sourceField := mapping.SourceField
var expr string
if mapping.TransformRule != nil {
expr = e.applyTransformExpr(mapping.TransformRule, fmt.Sprintf("%s.%s", sourceTable, sourceField))
} else {
expr = fmt.Sprintf("%s.%s", sourceTable, sourceField)
}
if mapping.DefaultValue != nil {
expr = fmt.Sprintf("COALESCE(%s, '%v')", expr, mapping.DefaultValue)
}
selectParts = append(selectParts, fmt.Sprintf("%s AS %s", expr, targetField))
}
// FROM + JOIN
fromClause := e.buildFromClause(ec, sourceTable)
// JOIN 字段映射
selectParts = append(selectParts, e.buildJoinFieldSelects(ec)...)
// WHERE
whereClause := e.buildWhereClause(ec, sourceTable, statDate)
return fmt.Sprintf("SELECT %s FROM %s %s", strings.Join(selectParts, ", "), fromClause, whereClause)
}
// buildAggregateExtractSQL 聚合抽取模式SQLGROUP BY + SUM/COUNT/AVG
func (e *DailyExtractor) buildAggregateExtractSQL(ec *model.ExtractConfig, report *model.ReportConfig, sourceTable, dateField, statDate string) (string, []interface{}, error) {
var selectParts []string
var groupByParts []string
var args []interface{}
// 基础审计字段(聚合模式下用常量)
selectParts = append(selectParts, "ROW_NUMBER() OVER () AS id") // 伪自增ID
selectParts = append(selectParts, "1 AS tenant_id")
selectParts = append(selectParts, fmt.Sprintf("'%s' AS business_code", ec.BusinessCode))
selectParts = append(selectParts, "'system' AS creator")
selectParts = append(selectParts, "NOW() AS created_at")
selectParts = append(selectParts, "'system' AS updater")
selectParts = append(selectParts, "NOW() AS updated_at")
selectParts = append(selectParts, "NULL::TIMESTAMP AS deleted_at")
// 日期字段(常量)
selectParts = append(selectParts, fmt.Sprintf("'%s' AS %s", statDate, dateField))
// 原始数据
selectParts = append(selectParts, "'{}'::JSONB AS raw_data")
// GroupByFields 集合(快速查找)
gbySet := make(map[string]bool)
for _, gbf := range ec.GroupByFields {
gbySet[gbf] = true
}
// 添加 GroupBy 字段到 SELECT 和 GROUP BY
for _, gbf := range ec.GroupByFields {
selectParts = append(selectParts, fmt.Sprintf("%s.%s", sourceTable, gbf))
groupByParts = append(groupByParts, fmt.Sprintf("%s.%s", sourceTable, gbf))
}
// 字段映射:根据 AggregateFunction 决定聚合方式
for _, mapping := range ec.FieldMappings {
targetField := mapping.TargetField
sourceField := mapping.SourceField
// 构建源表达式
var sourceExpr string
if mapping.TransformRule != nil {
sourceExpr = e.applyTransformExpr(mapping.TransformRule, fmt.Sprintf("%s.%s", sourceTable, sourceField))
} else {
sourceExpr = fmt.Sprintf("%s.%s", sourceTable, sourceField)
}
// 判断是否需要聚合
aggFunc := strings.ToUpper(mapping.AggregateFunction)
if aggFunc != "" && !gbySet[sourceField] {
// 聚合字段SUM(s.xxx) / COUNT(s.xxx) / AVG(s.xxx)
expr := fmt.Sprintf("%s(%s)", aggFunc, sourceExpr)
if mapping.DefaultValue != nil {
expr = fmt.Sprintf("COALESCE(%s, %v)", expr, mapping.DefaultValue)
}
selectParts = append(selectParts, fmt.Sprintf("%s AS %s", expr, targetField))
} else if gbySet[sourceField] {
// GroupBy 字段不需要重复加入 SELECT已通过 groupByFields 处理)
continue
} else {
// 非聚合字段,也未在 GroupBy 中 → 用 MAX/MIN 取值(兼容 PG only_full_group_by
expr := fmt.Sprintf("MAX(%s)", sourceExpr)
if mapping.DefaultValue != nil {
expr = fmt.Sprintf("COALESCE(%s, %v)", expr, mapping.DefaultValue)
}
selectParts = append(selectParts, fmt.Sprintf("%s AS %s", expr, targetField))
}
}
// FROM + JOIN
fromClause := e.buildFromClause(ec, sourceTable)
// WHERE
whereClause := e.buildWhereClause(ec, sourceTable, statDate)
// 组合 SQL
sql := fmt.Sprintf("SELECT %s FROM %s %s",
strings.Join(selectParts, ", "),
fromClause,
whereClause)
// GROUP BY
if len(groupByParts) > 0 {
sql += " GROUP BY " + strings.Join(groupByParts, ", ")
}
return sql, args, nil
}
// buildFromClause 构建FROM + JOIN子句
func (e *DailyExtractor) buildFromClause(ec *model.ExtractConfig, sourceTable string) string {
fromClause := fmt.Sprintf("%s %s", ec.SourceTableName, sourceTable)
for _, join := range ec.JoinConfigs {
joinType := "LEFT JOIN"
jType := strings.ToUpper(join.JoinType)
if jType == "INNER" {
joinType = "INNER JOIN"
} else if jType == "RIGHT" {
joinType = "RIGHT JOIN"
}
joinAlias := join.JoinAlias
if joinAlias == "" {
joinAlias = join.JoinTable
}
fromClause += fmt.Sprintf(" %s %s %s ON %s", joinType, join.JoinTable, joinAlias, join.JoinCondition)
}
return fromClause
}
// buildJoinFieldSelects 构建JOIN表的字段映射SELECT部分
func (e *DailyExtractor) buildJoinFieldSelects(ec *model.ExtractConfig) []string {
var parts []string
for _, join := range ec.JoinConfigs {
joinAlias := join.JoinAlias
if joinAlias == "" {
joinAlias = join.JoinTable
}
for _, jm := range join.FieldMappings {
targetField := jm.TargetField
sourceExpr := fmt.Sprintf("%s.%s", joinAlias, jm.SourceField)
if jm.TransformRule != nil {
sourceExpr = e.applyTransformExpr(jm.TransformRule, sourceExpr)
}
parts = append(parts, fmt.Sprintf("%s AS %s", sourceExpr, targetField))
}
}
return parts
}
// buildWhereClause 构建WHERE子句
func (e *DailyExtractor) buildWhereClause(ec *model.ExtractConfig, sourceTable, statDate string) string {
var whereConditions []string
// 日期范围(增量抽取)
if ec.ExtractType == model.ExtractTypeIncremental && ec.ExtractKeyField != "" {
dateCondition := fmt.Sprintf("%s.%s::date = '%s'", sourceTable, ec.ExtractKeyField, statDate)
whereConditions = append(whereConditions, dateCondition)
}
// 自定义过滤条件
if ec.FilterExpression != "" {
whereConditions = append(whereConditions, ec.FilterExpression)
}
if len(whereConditions) == 0 {
return ""
}
return "WHERE " + strings.Join(whereConditions, " AND ")
}
// applyTransformExpr 应用转换表达式
func (e *DailyExtractor) applyTransformExpr(rule *model.TransformRule, sourceExpr string) string {
switch rule.RuleType {
case "CALCULATE":
if rule.Expression != "" {
return strings.ReplaceAll(rule.Expression, "{source}", sourceExpr)
}
case "FORMAT":
if rule.Format != "" {
return fmt.Sprintf("TO_CHAR(%s, '%s')", sourceExpr, rule.Format)
}
case "MAPPING":
// 在代码中运行时做映射
return sourceExpr
}
return sourceExpr
}
// applyTransformRules 应用运行时转换规则(映射等代码转换)
func (e *DailyExtractor) applyTransformRules(ec *model.ExtractConfig, row map[string]interface{}) {
for _, rule := range ec.TransformRules {
if rule.RuleType != "MAPPING" {
continue
}
sourceField := rule.Expression // 存储源字段名
targetField := rule.RuleCode // 存储目标字段名
if sourceVal, ok := row[sourceField]; ok {
strVal := gconv.String(sourceVal)
if mapped, exists := rule.Mapping[strVal]; exists {
row[targetField] = mapped
}
}
}
}
// ensureStatTableExists 确保统计宽表存在
func (e *DailyExtractor) ensureStatTableExists(ctx context.Context, report *model.ReportConfig, fieldMap map[string]*model.FieldConfig) error {
tableName := report.StatTableName
// 检查表是否存在
result, err := gfdb.DB(ctx).GetAll(ctx, "SELECT COUNT(*) FROM pg_tables WHERE tablename = $1", strings.ToLower(tableName))
if err != nil {
return err
}
count := 0
if len(result) > 0 {
count = result[0]["count"].Int()
}
if count == 0 {
// 需要建表
return e.createStatTable(ctx, report, fieldMap)
}
logrus.Infof("统计宽表 %s 已存在", tableName)
return nil
}
// createStatTable 创建统计宽表
func (e *DailyExtractor) createStatTable(ctx context.Context, report *model.ReportConfig, fieldMap map[string]*model.FieldConfig) error {
var cols []string
// 标准审计字段
cols = append(cols, "id BIGSERIAL PRIMARY KEY")
cols = append(cols, "tenant_id BIGINT NOT NULL DEFAULT 0")
cols = append(cols, "business_code VARCHAR(64) NOT NULL DEFAULT ''")
cols = append(cols, "creator VARCHAR(64) DEFAULT ''")
cols = append(cols, "created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()")
cols = append(cols, "updater VARCHAR(64) DEFAULT ''")
cols = append(cols, "updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()")
cols = append(cols, "deleted_at TIMESTAMP WITH TIME ZONE")
// 日期字段
dateField := report.DateField
if dateField == "" {
dateField = "stat_date"
}
cols = append(cols, fmt.Sprintf("%s VARCHAR(16) NOT NULL DEFAULT ''", dateField))
// 业务字段
for _, fc := range fieldMap {
fc := fc
colType := fieldTypeToPG(fc.FieldType)
cols = append(cols, fmt.Sprintf("%s %s", fc.FieldCode, colType))
}
// 原始数据
cols = append(cols, "raw_data JSONB DEFAULT '{}'")
tableName := report.StatTableName
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (\n %s\n)", tableName, strings.Join(cols, ",\n "))
logrus.Infof("创建统计宽表: %s", tableName)
if _, err := gfdb.DB(ctx).Exec(ctx, sql); err != nil {
return fmt.Errorf("建表失败: %w", err)
}
// 冲突唯一索引
if len(report.ConflictKeys) > 0 {
indexName := fmt.Sprintf("uq_%s_conflict", tableName)
indexCols := strings.Join(report.ConflictKeys, ", ")
indexSQL := fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s)", indexName, tableName, indexCols)
if _, err := gfdb.DB(ctx).Exec(ctx, indexSQL); err != nil {
logrus.Warnf("创建冲突索引失败: %v", err)
}
}
// 字段注释
for _, fc := range fieldMap {
fc := fc
if fc.FieldName != "" {
escaped := strings.ReplaceAll(fc.FieldName, "'", "''")
commentSQL := fmt.Sprintf("COMMENT ON COLUMN %s.%s IS '%s'", tableName, fc.FieldCode, escaped)
if _, err := gfdb.DB(ctx).Exec(ctx, commentSQL); err != nil {
logrus.Warnf("添加字段注释失败 [%s.%s]: %v", tableName, fc.FieldCode, err)
}
}
}
return nil
}
// batchUpsert 批量upsert写入
func (e *DailyExtractor) batchUpsert(ctx context.Context, tableName string, conflictKeys []string, rows []map[string]interface{}) (int, []string, error) {
if len(rows) == 0 {
return 0, nil, nil
}
now := time.Now()
for i := range rows {
if rows[i] == nil {
rows[i] = make(map[string]interface{})
}
rows[i]["updated_at"] = now
}
batchSize := 100
total := 0
var allColumns []string
for i := 0; i < len(rows); i += batchSize {
end := i + batchSize
if end > len(rows) {
end = len(rows)
}
batch := rows[i:end]
m := gfdb.DB(ctx).Model(ctx, tableName).Data(batch)
if len(conflictKeys) > 0 {
keys := make([]interface{}, len(conflictKeys))
for j, k := range conflictKeys {
keys[j] = k
}
m = m.OnConflict(keys...)
}
_, err := m.Save()
if err != nil {
logrus.Errorf("批量写入 %s 失败: %v", tableName, err)
// 逐条重试
for _, row := range batch {
mm := gfdb.DB(ctx).Model(ctx, tableName).Data(row)
if len(conflictKeys) > 0 {
keys := make([]interface{}, len(conflictKeys))
for j, k := range conflictKeys {
keys[j] = k
}
mm = mm.OnConflict(keys...)
}
if _, e := mm.Save(); e != nil {
logrus.Errorf("逐条写入失败: %v", e)
} else {
total++
}
}
} else {
total += len(batch)
}
}
return total, allColumns, nil
}
// fieldTypeToPG 字段类型转PG类型
func fieldTypeToPG(fieldType string) string {
switch fieldType {
case model.FieldTypeInt:
return "NUMERIC(20,0) DEFAULT 0"
case model.FieldTypeFloat:
return "NUMERIC(20,4) DEFAULT 0"
case model.FieldTypeDate:
return "VARCHAR(16) DEFAULT ''"
case model.FieldTypeDatetime:
return "TIMESTAMP WITH TIME ZONE"
case model.FieldTypeJsonb:
return "JSONB DEFAULT '{}'"
default:
return "VARCHAR(256) DEFAULT ''"
}
}