// 实物库存批次DAO层 // 职责:CRUD、IncrementAvailableQty原子操作(防并发超卖)、SumAvailableQtyByLocation聚合汇总 // 紧密耦合:service.PrivateStock、service.Capacity(容量计算入口) // 注意:IncrementAvailableQty使用$inc+$gte条件,防止并发导致库存变负 package dao import ( "assets/consts/public" "assets/consts/stock" dto "assets/model/dto/stock" entity "assets/model/entity/stock" "context" "gitea.com/red-future/common/beans" "gitea.com/red-future/common/db/mongo" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "go.mongodb.org/mongo-driver/v2/bson" ) var PrivateStock = new(privateStock) type privateStock struct { } // Insert 插入私域库存 func (d *privateStock) Insert(ctx context.Context, req *dto.CreatePrivateStockReq) (ids []interface{}, err error) { var result *entity.PrivateStock if err = utils.Struct(req, &result); err != nil { return } // 设置StockType(如果未指定,默认为PrivateStock类型) if !g.IsEmpty(req.StockType) { result.StockType = req.StockType } else { result.StockType = stock.StockLocationTypePrivateStock } // 获取仓库信息(非必填) if req.WarehouseId != nil && !req.WarehouseId.IsZero() { warehouse, err := Warehouse.GetOne(ctx, &dto.GetWarehouseReq{Id: req.WarehouseId}) if err != nil { return nil, err } result.WarehouseID = req.WarehouseId result.WarehouseCode = warehouse.WarehouseCode result.WarehouseName = warehouse.WarehouseName } // 如果有库区信息 if req.ZoneId != nil && !req.ZoneId.IsZero() { zone, err := Zone.GetOne(ctx, &dto.GetZoneReq{Id: req.ZoneId}) if err != nil { return nil, err } result.ZoneID = req.ZoneId result.ZoneCode = zone.ZoneCode result.ZoneName = zone.ZoneName result.ZoneType = zone.ZoneType } // 如果有库位信息 if req.LocationId != nil && !req.LocationId.IsZero() { location, err := Location.GetOne(ctx, &dto.GetLocationReq{Id: req.LocationId}) if err != nil { return nil, err } result.LocationID = req.LocationId result.LocationCode = location.LocationCode result.LocationName = location.LocationName result.LocationType = location.LocationType } ids, err = mongo.DB().Insert(ctx, []interface{}{&result}, public.PrivateStockCollection) return } // Update 更新私域库存 func (d *privateStock) Update(ctx context.Context, req *dto.UpdatePrivateStockReq) (err error) { filter := bson.M{"_id": req.Id} update := bson.M{"$set": bson.M{}} if req.WarehouseId != nil && !req.WarehouseId.IsZero() { warehouse, err := Warehouse.GetOne(ctx, &dto.GetWarehouseReq{Id: req.WarehouseId}) if err != nil { return err } update["$set"].(bson.M)["warehouseId"] = req.WarehouseId update["$set"].(bson.M)["warehouseCode"] = warehouse.WarehouseCode update["$set"].(bson.M)["warehouseName"] = warehouse.WarehouseName } if req.ZoneId != nil && !req.ZoneId.IsZero() { zone, err := Zone.GetOne(ctx, &dto.GetZoneReq{Id: req.ZoneId}) if err != nil { return err } update["$set"].(bson.M)["zoneId"] = req.ZoneId update["$set"].(bson.M)["zoneCode"] = zone.ZoneCode update["$set"].(bson.M)["zoneName"] = zone.ZoneName update["$set"].(bson.M)["zoneType"] = zone.ZoneType } if req.LocationId != nil && !req.LocationId.IsZero() { location, err := Location.GetOne(ctx, &dto.GetLocationReq{Id: req.LocationId}) if err != nil { return err } update["$set"].(bson.M)["locationId"] = req.LocationId update["$set"].(bson.M)["locationCode"] = location.LocationCode update["$set"].(bson.M)["locationName"] = location.LocationName update["$set"].(bson.M)["locationType"] = location.LocationType } if req.PrivateSkuID != nil { update["$set"].(bson.M)["privateSkuId"] = req.PrivateSkuID } if !g.IsEmpty(req.BatchNo) { update["$set"].(bson.M)["batchNo"] = req.BatchNo } if req.BatchQty > 0 { update["$set"].(bson.M)["batchQty"] = req.BatchQty } // AvailableQty已移除全量覆盖,请使用IncrementAvailableQty方法进行增量更新 if req.BatchStatus != nil { update["$set"].(bson.M)["batchStatus"] = req.BatchStatus } if req.StockStatus != nil { update["$set"].(bson.M)["stockStatus"] = req.StockStatus } if req.SupplierID != nil { update["$set"].(bson.M)["supplierId"] = req.SupplierID } if req.SupportsRecycle != nil { update["$set"].(bson.M)["supportsRecycle"] = *req.SupportsRecycle } if req.ProductionDate != nil { update["$set"].(bson.M)["productionDate"] = req.ProductionDate } if req.ExpiryDate != nil { update["$set"].(bson.M)["expiryDate"] = req.ExpiryDate } if req.ExpiryWarningDate != nil { update["$set"].(bson.M)["expiryWarningDate"] = req.ExpiryWarningDate } if !g.IsEmpty(req.PrivateCategoryPath) { update["$set"].(bson.M)["privateCategoryPath"] = req.PrivateCategoryPath } if !g.IsEmpty(req.StockType) { update["$set"].(bson.M)["stockType"] = req.StockType } _, err = mongo.DB().Update(ctx, filter, update, public.PrivateStockCollection) return } // DeleteFake 软删除私域库存 func (d *privateStock) DeleteFake(ctx context.Context, req *dto.DeletePrivateStockReq) error { filter := bson.M{"_id": req.Id} _, err := mongo.DB().DeleteSoft(ctx, filter, public.PrivateStockCollection) return err } // GetOne 根据ID查询私域库存 func (d *privateStock) GetOne(ctx context.Context, req *dto.GetPrivateStockReq) (res *entity.PrivateStock, err error) { filter := bson.M{"_id": req.Id} err = mongo.DB().FindOne(ctx, filter, &res, public.PrivateStockCollection) return } // IncrementAvailableQty 增量更新可用数量(并发安全) // deltaQty:正数表示增加(入库),负数表示减少(出库) // 减少时自动添加 availableQty >= |deltaQty| 条件,防止并发导致库存变负 func (d *privateStock) IncrementAvailableQty(ctx context.Context, id *bson.ObjectID, deltaQty int) (err error) { filter := bson.M{"_id": id} if deltaQty < 0 { filter["availableQty"] = bson.M{"$gte": -deltaQty} } update := bson.M{ "$inc": bson.M{ "availableQty": deltaQty, }, } modifiedCount, err := mongo.DB().Update(ctx, filter, update, public.PrivateStockCollection) if err != nil { return } if deltaQty < 0 && modifiedCount == 0 { err = gerror.Newf("库存不足,无法减少%d", -deltaQty) } return } // List 查询私域库存列表 func (d *privateStock) List(ctx context.Context, req *dto.ListPrivateStockReq) (res []entity.PrivateStock, total int64, err error) { filter := bson.M{} if req.WarehouseId != nil && !req.WarehouseId.IsZero() { filter["warehouseId"] = req.WarehouseId } if req.ZoneId != nil && !req.ZoneId.IsZero() { filter["zoneId"] = req.ZoneId } if req.LocationId != nil && !req.LocationId.IsZero() { filter["locationId"] = req.LocationId } if req.PrivateSkuID != nil && !req.PrivateSkuID.IsZero() { filter["privateSkuId"] = req.PrivateSkuID } if req.BatchStatus != nil { filter["batchStatus"] = req.BatchStatus } if req.StockStatus != nil { filter["stockStatus"] = req.StockStatus } if req.SupplierID != nil && !req.SupplierID.IsZero() { filter["supplierId"] = req.SupplierID } if !g.IsEmpty(req.PrivateCategoryPath) { filter["privateCategoryPath"] = req.PrivateCategoryPath } if !g.IsEmpty(req.StockType) { filter["stockType"] = req.StockType } // 默认排序 if len(req.OrderBy) == 0 { req.OrderBy = []beans.OrderBy{ {Field: "createdAt", Order: beans.Desc}, } } total, err = mongo.DB().Find(ctx, filter, &res, public.PrivateStockCollection, req.Page, req.OrderBy) return } // SumAvailableQtyByLocation 按库位ID汇总所有库存的可用数量(使用MongoDB聚合管道) func (d *privateStock) SumAvailableQtyByLocation(ctx context.Context, locationId *bson.ObjectID) (totalQty int, err error) { pipeline := bson.A{ bson.M{"$match": bson.M{ "locationId": locationId, "isDeleted": false, }}, bson.M{"$group": bson.M{ "_id": nil, "totalQty": bson.M{"$sum": "$availableQty"}, }}, } coll := mongo.GetDB().Collection(public.PrivateStockCollection) cursor, err := coll.Aggregate(ctx, pipeline) if err != nil { return } defer cursor.Close(ctx) var results []struct { TotalQty int `bson:"totalQty"` } if err = cursor.All(ctx, &results); err != nil { return } if len(results) > 0 { totalQty = results[0].TotalQty } return }