// 库位容量管理服务 // 职责:库位/库区/仓库三级容量计算与同步,支持整入整出换算 // 调用链:PrivateStock.Create/Update/Delete → UpdateLocationCapacity → SyncCapacityToZone → SyncCapacityToWarehouse // 紧密耦合:dao.Location(更新容量)、dao.Zone(汇总)、dao.Warehouse(汇总)、dao.UnitConversion(单位换算) // 注意:使用Redis分布式锁防止并发重算覆盖,锁key格式 lock:location:{id}:capacity package service import ( "assets/consts/public" "assets/consts/stock" dao "assets/dao/stock" dto "assets/model/dto/stock" entityAsset "assets/model/entity/asset" "context" "fmt" "math" "gitea.com/red-future/common/db/mongo" "gitea.com/red-future/common/jaeger" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "go.mongodb.org/mongo-driver/v2/bson" ) var Capacity = new(capacity) type capacity struct{} // UpdateLocationCapacity 更新库位容量(入口方法,带Redis分布式锁) func (s *capacity) UpdateLocationCapacity(ctx context.Context, locationId *bson.ObjectID) (err error) { // Redis分布式锁(防止并发入库/出库同一库位时重算覆盖) lockKey := fmt.Sprintf("lock:location:%s:capacity", locationId.Hex()) expireSeconds := int64(30) var zoneId *bson.ObjectID var capacityUnitType stock.CapacityUnitType success, err := utils.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error { // 1. 查询库位信息 location, err := dao.Location.GetOne(ctx, &dto.GetLocationReq{Id: locationId}) if err != nil { g.Log().Errorf(ctx, "查询库位失败: %v", err) return err } zoneId = location.ZoneId capacityUnitType = location.CapacityUnitType // 2. 查询库位下所有库存记录 privateStocks, _, err := dao.PrivateStock.List(ctx, &dto.ListPrivateStockReq{ LocationId: locationId, }) if err != nil { g.Log().Errorf(ctx, "查询库位库存失败: %v", err) return err } // 3. 批量查询PrivateSku(避免N+1查询问题) skuIds := make([]*bson.ObjectID, 0, len(privateStocks)) for _, ps := range privateStocks { if ps.PrivateSkuID != nil && ps.AvailableQty > 0 { skuIds = append(skuIds, ps.PrivateSkuID) } } // 批量查询PrivateSku并构建Map缓存 skuMap := make(map[string]*entityAsset.PrivateSku) if len(skuIds) > 0 { var skuList []*entityAsset.PrivateSku filter := bson.M{"_id": bson.M{"$in": skuIds}} _, err = mongo.DB().Find(ctx, filter, &skuList, public.PrivateSkuCollection, nil, nil) if err != nil { g.Log().Errorf(ctx, "批量查询PrivateSku失败: %v", err) return err } // 构建Map缓存 for i := range skuList { skuMap[skuList[i].Id.Hex()] = skuList[i] } } // 4. 整入整出计算:先按SKU聚合同库位的总数量(同SKU可合箱),再按库位单位换算 // 聚合同一SKU的总数量,避免逐条取整导致容量虚高 // 例:2批次各1瓶,逐条取整=2箱(错误),聚合后取整=ceil(2/20)=1箱(正确) skuQtyMap := make(map[string]int) // key: privateSkuId.Hex(), value: 总可用数量 for _, ps := range privateStocks { if ps.PrivateSkuID == nil || ps.AvailableQty <= 0 { continue } skuQtyMap[ps.PrivateSkuID.Hex()] += ps.AvailableQty } totalCapacity := 0 for skuIdHex, totalQty := range skuQtyMap { // 从Map缓存中获取PrivateSku privateSku, exists := skuMap[skuIdHex] if !exists || privateSku == nil { g.Log().Warningf(ctx, "PrivateSku不存在,跳过: %s", skuIdHex) continue } // 检查location和privateSku的Capacity是否为nil if location.Capacity == nil || privateSku.Capacity.CapacityUnit == "" { g.Log().Warningf(ctx, "库位或SKU容量信息不完整,跳过") continue } // 如果库存单位与库位单位相同,直接累加 if privateSku.Capacity.CapacityUnit == location.Capacity.CapacityUnit { totalCapacity += totalQty continue } // 不同单位需要换算 conversion, err := dao.UnitConversion.GetByUnits(ctx, location.CapacityUnitType, privateSku.Capacity.CapacityUnit, location.Capacity.CapacityUnit, ) if err != nil { err = gerror.Newf("未找到单位换算规则 %s→%s,请在系统中添加该换算规则", privateSku.Capacity.CapacityUnit, location.Capacity.CapacityUnit) jaeger.RecordError(ctx, err) return err } // 检查换算系数是否为0,防止除零错误 if conversion.ConversionFactor == 0 { err = gerror.Newf("换算系数为0:%s→%s,请检查换算规则配置", privateSku.Capacity.CapacityUnit, location.Capacity.CapacityUnit) jaeger.RecordError(ctx, err) return err } // 向上取整计算(整入整出:同SKU合箱后取整,不足一箱按一箱计) convertedQty := int(math.Ceil(float64(totalQty) / conversion.ConversionFactor)) totalCapacity += convertedQty g.Log().Debugf(ctx, "单位换算: %d%s ÷ %.2f = %d%s", totalQty, privateSku.Capacity.CapacityUnit, conversion.ConversionFactor, convertedQty, location.Capacity.CapacityUnit) } currentCapacity := totalCapacity // 5. 更新库位容量 err = dao.Location.UpdateCapacity(ctx, locationId, currentCapacity) if err != nil { g.Log().Errorf(ctx, "更新库位容量失败: %v", err) return err } g.Log().Infof(ctx, "库位容量更新成功: locationId=%s, 当前容量=%d", locationId.Hex(), currentCapacity) return nil }) if !success { return fmt.Errorf("获取库位容量锁失败: %v", err) } if err != nil { return } // 6. 触发向上汇总到库区(在锁外执行,避免嵌套锁时间过长) if zoneId != nil && !zoneId.IsZero() { if syncErr := s.SyncCapacityToZone(ctx, zoneId, capacityUnitType); syncErr != nil { g.Log().Errorf(ctx, "同步库区容量失败: %v", syncErr) } } return } // SyncCapacityToZone 同步容量到库区(带Redis分布式锁) func (s *capacity) SyncCapacityToZone(ctx context.Context, zoneId *bson.ObjectID, unitType stock.CapacityUnitType) (err error) { // 1. Redis分布式锁 lockKey := fmt.Sprintf("lock:zone:%s:capacity:%s", zoneId.Hex(), unitType) expireSeconds := int64(30) // 30秒超时 success, err := utils.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error { // 2. 查询该库区下所有使用该单位类型的库位 locations, err := dao.Location.ListByZoneAndUnitType(ctx, zoneId, unitType) if err != nil { return fmt.Errorf("查询库位列表失败: %v", err) } // 3. 汇总所有库位的当前容量 totalCapacity := 0 maxCapacity := 0 var capacityUnit string for _, loc := range locations { if loc.Capacity != nil { totalCapacity += loc.Capacity.CurrentCapacity maxCapacity += loc.Capacity.MaxCapacity if capacityUnit == "" { capacityUnit = loc.Capacity.CapacityUnit } } } // 4. 查询库区信息(获取warehouseId) zone, err := dao.Zone.GetOne(ctx, &dto.GetZoneReq{Id: zoneId}) if err != nil { return fmt.Errorf("查询库区失败: %v", err) } // 5. 更新库区该单位类型的容量 err = dao.Zone.UpdateCapacityByUnitType(ctx, zoneId, unitType, totalCapacity, maxCapacity, capacityUnit) if err != nil { return fmt.Errorf("更新库区容量失败: %v", err) } g.Log().Infof(ctx, "库区容量同步成功: zoneId=%s, unitType=%s, 当前容量=%d", zoneId.Hex(), unitType, totalCapacity) // 6. 触发向上汇总到仓库 if zone.WarehouseId != "" { warehouseObjId, hexErr := bson.ObjectIDFromHex(zone.WarehouseId) if hexErr != nil { g.Log().Errorf(ctx, "库区WarehouseId格式错误: %s, %v", zone.WarehouseId, hexErr) } else { if syncErr := s.SyncCapacityToWarehouse(ctx, &warehouseObjId, unitType); syncErr != nil { g.Log().Errorf(ctx, "同步仓库容量失败: %v", syncErr) } } } return nil }) if !success { return fmt.Errorf("获取Redis锁失败: %v", err) } return } // SyncCapacityToWarehouse 同步容量到仓库(带Redis分布式锁) func (s *capacity) SyncCapacityToWarehouse(ctx context.Context, warehouseId *bson.ObjectID, unitType stock.CapacityUnitType) (err error) { // 1. Redis分布式锁 lockKey := fmt.Sprintf("lock:warehouse:%s:capacity:%s", warehouseId.Hex(), unitType) expireSeconds := int64(30) // 30秒超时 success, err := utils.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error { // 2. 查询该仓库下所有库区 zones, err := dao.Zone.ListByWarehouseAndUnitType(ctx, warehouseId.Hex()) if err != nil { return fmt.Errorf("查询库区列表失败: %v", err) } // 3. 汇总所有库区该单位类型的容量 totalCapacity := 0 maxCapacity := 0 var capacityUnit string for _, zone := range zones { if zone.Capacity != nil { if cap, exists := (*zone.Capacity)[unitType]; exists { totalCapacity += cap.CurrentCapacity maxCapacity += cap.MaxCapacity if capacityUnit == "" { capacityUnit = cap.CapacityUnit } } } } // 4. 更新仓库该单位类型的容量 err = dao.Warehouse.UpdateCapacityByUnitType(ctx, warehouseId, unitType, totalCapacity, maxCapacity, capacityUnit) if err != nil { return fmt.Errorf("更新仓库容量失败: %v", err) } g.Log().Infof(ctx, "仓库容量同步成功: warehouseId=%s, unitType=%s, 当前容量=%d", warehouseId.Hex(), unitType, totalCapacity) return nil }) if !success { return fmt.Errorf("获取Redis锁失败: %v", err) } return } // ConvertWithCeil 向上取整换算(用于容量计算) func (s *capacity) ConvertWithCeil(fromQty int, conversionFactor float64) int { return int(math.Ceil(float64(fromQty) / conversionFactor)) }