2026-03-18 10:18:03 +08:00
|
|
|
|
// 库位容量管理服务
|
|
|
|
|
|
// 职责:库位/库区/仓库三级容量计算与同步,支持整入整出换算
|
|
|
|
|
|
// 调用链:PrivateStock.Create/Update/Delete → UpdateLocationCapacity → SyncCapacityToZone → SyncCapacityToWarehouse
|
|
|
|
|
|
// 紧密耦合:dao.Location(更新容量)、dao.Zone(汇总)、dao.Warehouse(汇总)、dao.UnitConversion(单位换算)
|
|
|
|
|
|
// 注意:使用Redis分布式锁防止并发重算覆盖,锁key格式 lock:location:{id}:capacity
|
|
|
|
|
|
package service
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"assets/consts/public"
|
|
|
|
|
|
"assets/consts/stock"
|
|
|
|
|
|
dao "assets/dao/stock"
|
|
|
|
|
|
dto "assets/model/dto/stock"
|
|
|
|
|
|
entityAsset "assets/model/entity/asset"
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"math"
|
|
|
|
|
|
|
|
|
|
|
|
"gitea.com/red-future/common/db/mongo"
|
|
|
|
|
|
"gitea.com/red-future/common/jaeger"
|
2026-03-27 09:49:44 +08:00
|
|
|
|
"gitea.com/red-future/common/utils"
|
2026-03-18 10:18:03 +08:00
|
|
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var Capacity = new(capacity)
|
|
|
|
|
|
|
|
|
|
|
|
type capacity struct{}
|
|
|
|
|
|
|
|
|
|
|
|
// UpdateLocationCapacity 更新库位容量(入口方法,带Redis分布式锁)
|
|
|
|
|
|
func (s *capacity) UpdateLocationCapacity(ctx context.Context, locationId *bson.ObjectID) (err error) {
|
|
|
|
|
|
// Redis分布式锁(防止并发入库/出库同一库位时重算覆盖)
|
|
|
|
|
|
lockKey := fmt.Sprintf("lock:location:%s:capacity", locationId.Hex())
|
|
|
|
|
|
expireSeconds := int64(30)
|
|
|
|
|
|
|
|
|
|
|
|
var zoneId *bson.ObjectID
|
|
|
|
|
|
var capacityUnitType stock.CapacityUnitType
|
|
|
|
|
|
|
2026-03-27 09:49:44 +08:00
|
|
|
|
success, err := utils.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error {
|
2026-03-18 10:18:03 +08:00
|
|
|
|
// 1. 查询库位信息
|
|
|
|
|
|
location, err := dao.Location.GetOne(ctx, &dto.GetLocationReq{Id: locationId})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "查询库位失败: %v", err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
zoneId = location.ZoneId
|
|
|
|
|
|
capacityUnitType = location.CapacityUnitType
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 查询库位下所有库存记录
|
|
|
|
|
|
privateStocks, _, err := dao.PrivateStock.List(ctx, &dto.ListPrivateStockReq{
|
|
|
|
|
|
LocationId: locationId,
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "查询库位库存失败: %v", err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 批量查询PrivateSku(避免N+1查询问题)
|
|
|
|
|
|
skuIds := make([]*bson.ObjectID, 0, len(privateStocks))
|
|
|
|
|
|
for _, ps := range privateStocks {
|
|
|
|
|
|
if ps.PrivateSkuID != nil && ps.AvailableQty > 0 {
|
|
|
|
|
|
skuIds = append(skuIds, ps.PrivateSkuID)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 批量查询PrivateSku并构建Map缓存
|
|
|
|
|
|
skuMap := make(map[string]*entityAsset.PrivateSku)
|
|
|
|
|
|
if len(skuIds) > 0 {
|
|
|
|
|
|
var skuList []*entityAsset.PrivateSku
|
|
|
|
|
|
filter := bson.M{"_id": bson.M{"$in": skuIds}}
|
|
|
|
|
|
_, err = mongo.DB().Find(ctx, filter, &skuList, public.PrivateSkuCollection, nil, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "批量查询PrivateSku失败: %v", err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
// 构建Map缓存
|
|
|
|
|
|
for i := range skuList {
|
|
|
|
|
|
skuMap[skuList[i].Id.Hex()] = skuList[i]
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 4. 整入整出计算:先按SKU聚合同库位的总数量(同SKU可合箱),再按库位单位换算
|
|
|
|
|
|
// 聚合同一SKU的总数量,避免逐条取整导致容量虚高
|
|
|
|
|
|
// 例:2批次各1瓶,逐条取整=2箱(错误),聚合后取整=ceil(2/20)=1箱(正确)
|
|
|
|
|
|
skuQtyMap := make(map[string]int) // key: privateSkuId.Hex(), value: 总可用数量
|
|
|
|
|
|
for _, ps := range privateStocks {
|
|
|
|
|
|
if ps.PrivateSkuID == nil || ps.AvailableQty <= 0 {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
skuQtyMap[ps.PrivateSkuID.Hex()] += ps.AvailableQty
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
totalCapacity := 0
|
|
|
|
|
|
for skuIdHex, totalQty := range skuQtyMap {
|
|
|
|
|
|
// 从Map缓存中获取PrivateSku
|
|
|
|
|
|
privateSku, exists := skuMap[skuIdHex]
|
|
|
|
|
|
if !exists || privateSku == nil {
|
|
|
|
|
|
g.Log().Warningf(ctx, "PrivateSku不存在,跳过: %s", skuIdHex)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查location和privateSku的Capacity是否为nil
|
|
|
|
|
|
if location.Capacity == nil || privateSku.Capacity.CapacityUnit == "" {
|
|
|
|
|
|
g.Log().Warningf(ctx, "库位或SKU容量信息不完整,跳过")
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 如果库存单位与库位单位相同,直接累加
|
|
|
|
|
|
if privateSku.Capacity.CapacityUnit == location.Capacity.CapacityUnit {
|
|
|
|
|
|
totalCapacity += totalQty
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 不同单位需要换算
|
|
|
|
|
|
conversion, err := dao.UnitConversion.GetByUnits(ctx,
|
|
|
|
|
|
location.CapacityUnitType,
|
|
|
|
|
|
privateSku.Capacity.CapacityUnit,
|
|
|
|
|
|
location.Capacity.CapacityUnit,
|
|
|
|
|
|
)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
err = gerror.Newf("未找到单位换算规则 %s→%s,请在系统中添加该换算规则",
|
|
|
|
|
|
privateSku.Capacity.CapacityUnit, location.Capacity.CapacityUnit)
|
|
|
|
|
|
jaeger.RecordError(ctx, err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查换算系数是否为0,防止除零错误
|
|
|
|
|
|
if conversion.ConversionFactor == 0 {
|
|
|
|
|
|
err = gerror.Newf("换算系数为0:%s→%s,请检查换算规则配置",
|
|
|
|
|
|
privateSku.Capacity.CapacityUnit, location.Capacity.CapacityUnit)
|
|
|
|
|
|
jaeger.RecordError(ctx, err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 向上取整计算(整入整出:同SKU合箱后取整,不足一箱按一箱计)
|
|
|
|
|
|
convertedQty := int(math.Ceil(float64(totalQty) / conversion.ConversionFactor))
|
|
|
|
|
|
totalCapacity += convertedQty
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Debugf(ctx, "单位换算: %d%s ÷ %.2f = %d%s",
|
|
|
|
|
|
totalQty, privateSku.Capacity.CapacityUnit,
|
|
|
|
|
|
conversion.ConversionFactor, convertedQty, location.Capacity.CapacityUnit)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
currentCapacity := totalCapacity
|
|
|
|
|
|
|
|
|
|
|
|
// 5. 更新库位容量
|
|
|
|
|
|
err = dao.Location.UpdateCapacity(ctx, locationId, currentCapacity)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "更新库位容量失败: %v", err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Infof(ctx, "库位容量更新成功: locationId=%s, 当前容量=%d",
|
|
|
|
|
|
locationId.Hex(), currentCapacity)
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
if !success {
|
|
|
|
|
|
return fmt.Errorf("获取库位容量锁失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 6. 触发向上汇总到库区(在锁外执行,避免嵌套锁时间过长)
|
|
|
|
|
|
if zoneId != nil && !zoneId.IsZero() {
|
|
|
|
|
|
if syncErr := s.SyncCapacityToZone(ctx, zoneId, capacityUnitType); syncErr != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "同步库区容量失败: %v", syncErr)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SyncCapacityToZone 同步容量到库区(带Redis分布式锁)
|
|
|
|
|
|
func (s *capacity) SyncCapacityToZone(ctx context.Context, zoneId *bson.ObjectID, unitType stock.CapacityUnitType) (err error) {
|
|
|
|
|
|
// 1. Redis分布式锁
|
|
|
|
|
|
lockKey := fmt.Sprintf("lock:zone:%s:capacity:%s", zoneId.Hex(), unitType)
|
|
|
|
|
|
expireSeconds := int64(30) // 30秒超时
|
|
|
|
|
|
|
2026-03-27 09:49:44 +08:00
|
|
|
|
success, err := utils.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error {
|
2026-03-18 10:18:03 +08:00
|
|
|
|
// 2. 查询该库区下所有使用该单位类型的库位
|
|
|
|
|
|
locations, err := dao.Location.ListByZoneAndUnitType(ctx, zoneId, unitType)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("查询库位列表失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 汇总所有库位的当前容量
|
|
|
|
|
|
totalCapacity := 0
|
|
|
|
|
|
maxCapacity := 0
|
|
|
|
|
|
var capacityUnit string
|
|
|
|
|
|
for _, loc := range locations {
|
|
|
|
|
|
if loc.Capacity != nil {
|
|
|
|
|
|
totalCapacity += loc.Capacity.CurrentCapacity
|
|
|
|
|
|
maxCapacity += loc.Capacity.MaxCapacity
|
|
|
|
|
|
if capacityUnit == "" {
|
|
|
|
|
|
capacityUnit = loc.Capacity.CapacityUnit
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 4. 查询库区信息(获取warehouseId)
|
|
|
|
|
|
zone, err := dao.Zone.GetOne(ctx, &dto.GetZoneReq{Id: zoneId})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("查询库区失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 5. 更新库区该单位类型的容量
|
|
|
|
|
|
err = dao.Zone.UpdateCapacityByUnitType(ctx, zoneId, unitType, totalCapacity, maxCapacity, capacityUnit)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("更新库区容量失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Infof(ctx, "库区容量同步成功: zoneId=%s, unitType=%s, 当前容量=%d",
|
|
|
|
|
|
zoneId.Hex(), unitType, totalCapacity)
|
|
|
|
|
|
|
|
|
|
|
|
// 6. 触发向上汇总到仓库
|
|
|
|
|
|
if zone.WarehouseId != "" {
|
|
|
|
|
|
warehouseObjId, hexErr := bson.ObjectIDFromHex(zone.WarehouseId)
|
|
|
|
|
|
if hexErr != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "库区WarehouseId格式错误: %s, %v", zone.WarehouseId, hexErr)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
if syncErr := s.SyncCapacityToWarehouse(ctx, &warehouseObjId, unitType); syncErr != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "同步仓库容量失败: %v", syncErr)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
if !success {
|
|
|
|
|
|
return fmt.Errorf("获取Redis锁失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// SyncCapacityToWarehouse 同步容量到仓库(带Redis分布式锁)
|
|
|
|
|
|
func (s *capacity) SyncCapacityToWarehouse(ctx context.Context, warehouseId *bson.ObjectID, unitType stock.CapacityUnitType) (err error) {
|
|
|
|
|
|
// 1. Redis分布式锁
|
|
|
|
|
|
lockKey := fmt.Sprintf("lock:warehouse:%s:capacity:%s", warehouseId.Hex(), unitType)
|
|
|
|
|
|
expireSeconds := int64(30) // 30秒超时
|
|
|
|
|
|
|
2026-03-27 09:49:44 +08:00
|
|
|
|
success, err := utils.Lock(ctx, lockKey, expireSeconds, func(ctx context.Context) error {
|
2026-03-18 10:18:03 +08:00
|
|
|
|
// 2. 查询该仓库下所有库区
|
|
|
|
|
|
zones, err := dao.Zone.ListByWarehouseAndUnitType(ctx, warehouseId.Hex())
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("查询库区列表失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 汇总所有库区该单位类型的容量
|
|
|
|
|
|
totalCapacity := 0
|
|
|
|
|
|
maxCapacity := 0
|
|
|
|
|
|
var capacityUnit string
|
|
|
|
|
|
for _, zone := range zones {
|
|
|
|
|
|
if zone.Capacity != nil {
|
|
|
|
|
|
if cap, exists := (*zone.Capacity)[unitType]; exists {
|
|
|
|
|
|
totalCapacity += cap.CurrentCapacity
|
|
|
|
|
|
maxCapacity += cap.MaxCapacity
|
|
|
|
|
|
if capacityUnit == "" {
|
|
|
|
|
|
capacityUnit = cap.CapacityUnit
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 4. 更新仓库该单位类型的容量
|
|
|
|
|
|
err = dao.Warehouse.UpdateCapacityByUnitType(ctx, warehouseId, unitType, totalCapacity, maxCapacity, capacityUnit)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("更新仓库容量失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Infof(ctx, "仓库容量同步成功: warehouseId=%s, unitType=%s, 当前容量=%d",
|
|
|
|
|
|
warehouseId.Hex(), unitType, totalCapacity)
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
if !success {
|
|
|
|
|
|
return fmt.Errorf("获取Redis锁失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ConvertWithCeil 向上取整换算(用于容量计算)
|
|
|
|
|
|
func (s *capacity) ConvertWithCeil(fromQty int, conversionFactor float64) int {
|
|
|
|
|
|
return int(math.Ceil(float64(fromQty) / conversionFactor))
|
|
|
|
|
|
}
|