// Package service 库存管理服务(Stock公共库存) // 职责:入库/出库操作,支持明细模式(StockDetails)和批次模式(StockBatch) // 调用链:Controller → StockOperation → stockPublishMessage → NATS → AddStock(消费者) // 紧密耦合:dao.StockDetails、dao.StockBatch、dao.AssetSku(更新库存数)、common/message(NATS发布) // 注意:移库/调拨是PrivateStock专属操作,不在此实现 package service import ( "assets/consts/public" "assets/consts/stock" assetDao "assets/dao/asset" dao "assets/dao/stock" assetDto "assets/model/dto/asset" stockDto "assets/model/dto/stock" assetEntity "assets/model/entity/asset" "context" "fmt" "gitea.redpowerfuture.com/red-future/common/beans" "gitea.redpowerfuture.com/red-future/common/utils" gmq "github.com/bjang03/gmq/core/gmq" "github.com/bjang03/gmq/mq" "github.com/bjang03/gmq/types" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" ) type stockManage struct{} // StockManage 库存管理服务(Stock公共库存) // 职责: // 1. 入库/出库:StockDetails(明细模式)和 StockBatch(批次模式)的库存操作 // 2. 库存查询表单字段生成 // 注意:移库(MoveStock)和调拨(TransferStock)是PrivateStock专属操作,不要在这里实现 var StockManage = new(stockManage) // GetStockFormFields 获取库存操作表单字段 func (s *stockManage) GetStockFormFields(ctx context.Context, req *stockDto.GetStockFormFieldsReq) (*stockDto.GetStockFormFieldsRes, error) { // 获取资产SKU信息 assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId}) if err != nil { return nil, err } fields := make([]map[string]interface{}, 0) // Stock 字段在两种模式下都显示 fields = append(fields, map[string]interface{}{ "name": "stock", "label": "库存数量", "type": "number", "required": true, "min": 1, }) // 如果是批次模式(2),添加批次相关字段 if !g.IsEmpty(assetSku.StockMode) && assetSku.StockMode == stock.StockModeBatch { fields = append(fields, []map[string]interface{}{ { "name": "batchNo", "label": "批次号", "type": "number", "required": true, "default": gconv.Int(gtime.Now().Format("20060102") + "0001"), "maxLength": 12, }, { "name": "productionDate", "label": "生产日期", "type": "date", }, { "name": "expiryDate", "label": "过期日期", "type": "date", }, { "name": "expiryWarningDate", "label": "临期预警时间", "type": "date", }, }...) } return &stockDto.GetStockFormFieldsRes{ StockMode: assetSku.StockMode, Fields: fields, }, nil } // StockOperation 库存操作入口(入库/出库) // 根据SKU的StockMode区分明细模式和批次模式,计算差值后发布消息到NATS func (s *stockManage) StockOperation(ctx context.Context, req *stockDto.StockOperationReq) (err error) { // TODO 后续需要排一下查询应该不走缓存,否则会导致库存不准确 assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId}) if err != nil { return } if !assetSku.UnlimitedStock && req.Stock >= 0 { var stockId int64 count := 0 if assetSku.StockMode == stock.StockModeDetail { detailsReq := &stockDto.GetSockDetailsReq{ AssetSkuId: assetSku.Id, Status: stock.StockStatusAvailable.Code(), } count, err = dao.StockDetails.Count(ctx, detailsReq) if err != nil { return err } } if assetSku.StockMode == stock.StockModeBatch { if g.IsEmpty(req.BatchNo) { return gerror.New("批次号不能为空") } getOne, err := dao.StockBatch.One(ctx, &stockDto.GetSockBatchReq{ BatchNo: req.BatchNo, }) if err != nil { return err } if !g.IsEmpty(getOne) { stockId = getOne.Id count = getOne.BatchQty } } stockCount := 0 operationType := "" if count != req.Stock { if count > req.Stock { stockCount = count - req.Stock operationType = "del" } else { stockCount = req.Stock - count operationType = "add" } } if !g.IsEmpty(operationType) && stockCount > 0 { if err = s.stockPublishMessage(ctx, assetSku, stockId, stockCount, operationType, req); err != nil { return err } } } return } // stockPublishMessage 发布库存变更消息到NATS // 消费者接收后执行实际的入库/出库操作(异步解耦) func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEntity.AssetSku, stockId int64, stockCount int, operationType string, req *stockDto.StockOperationReq) (err error) { // 用户信息 user, err := utils.GetUserInfo(ctx) if err != nil { return } publishMessage := stockDto.StockPublishMessage{ AssetId: assetSku.AssetId, AssetSkuId: assetSku.Id, TenantId: user.TenantId, UserName: user.UserName, StockCount: stockCount, OperationType: operationType, Metadata: gconv.Maps(assetSku.SpecValues), StockMode: assetSku.StockMode, BatchNo: req.BatchNo, ProductionDate: req.ProductionDate, ExpiryDate: req.ExpiryDate, ExpiryWarningDate: req.ExpiryWarningDate, } if !g.IsEmpty(stockId) { publishMessage.StockId = stockId } err = gmq.GetGmq("primary").GmqPublish(ctx, &mq.RedisPubMessage{ PubMessage: types.PubMessage{ Topic: public.StockDetailQueueName, Data: publishMessage, }, }) return } // AddStock NATS消费者调用,执行实际的入库/出库操作 // 使用Redis分布式锁防止并发冲突,支持明细模式和批次模式 func (s *stockManage) AddStock(ctx context.Context, msg any) error { var req = new(stockDto.StockPublishMessage) msgMap := gconv.Map(msg) if err := gconv.Struct(msgMap["data"], &req); err != nil { return err } // 设置 userId 和 tenantId 到 ctx ctx = context.WithValue(ctx, "user", &beans.User{ UserName: req.UserName, TenantId: req.TenantId, }) // 获取redis-租户存储-锁key fileLockKey := fmt.Sprintf(public.StockDetailLockKey, req.AssetSkuId) success, err := utils.Lock(ctx, fileLockKey, int64(60), func(ctx context.Context) error { if req.OperationType == "add" { if req.StockMode == stock.StockModeBatch { if !g.IsEmpty(req.StockId) { var updateReq = new(stockDto.UpdateSockBatchReq) if err := gconv.Struct(req, &updateReq); err != nil { return err } if _, err := dao.StockBatch.Update(ctx, updateReq); err != nil { return err } } else { var createReq = new(stockDto.CreateSockBatchReq) if err := gconv.Struct(req, &createReq); err != nil { return err } if _, err := dao.StockBatch.Insert(ctx, createReq); err != nil { return err } } } if req.StockMode == stock.StockModeDetail { // 创建指定数量的库存 stockDetailsList := make([]*stockDto.CreateSockDetailsReq, req.StockCount) for i := 0; i < req.StockCount; i++ { var createReq = new(stockDto.CreateSockDetailsReq) if err := gconv.Struct(req, &createReq); err != nil { return err } stockDetailsList = append(stockDetailsList, createReq) } // 批量插入数据库 if _, err := dao.StockDetails.BatchInsert(ctx, stockDetailsList); err != nil { return err } } } if req.OperationType == "del" { if req.StockMode == stock.StockModeBatch { req.StockCount = 0 - req.StockCount // 更新批次 batch := stockDto.UpdateSockBatchReq{ Id: req.StockId, BatchQty: req.StockCount, AvailableQty: req.StockCount, } if _, err := dao.StockBatch.Update(ctx, &batch); err != nil { return err } } if req.StockMode == stock.StockModeDetail { // 分页查询所有库存明细,收集所有ID var allStockIds []stockDto.DeleteSockDetailsReq pageSize := int64(50) for pageNum := int64(1); ; pageNum++ { details, total, err := dao.StockDetails.List(ctx, &stockDto.GetSockDetailsReq{ AssetSkuId: req.AssetSkuId, Status: stock.StockStatusAvailable.Code(), Page: &beans.Page{PageNum: pageNum, PageSize: pageSize}, }) if err != nil { return err } if pageNum == 1 && total < req.StockCount { return gerror.New("可操作库存数量不足") } // 收集当前页的ID for _, detail := range details { if !g.IsEmpty(detail.Id) { allStockIds = append(allStockIds, stockDto.DeleteSockDetailsReq{Id: detail.Id}) if len(allStockIds) >= req.StockCount { break } } } if len(allStockIds) >= req.StockCount { break } } // 根据ID批量删除库存 delCount, err := dao.StockDetails.Delete(ctx, allStockIds) if err != nil { return err } if delCount != int64(req.StockCount) { return gerror.New("删除库存数量不匹配") } req.StockCount = 0 - req.StockCount } } _, err := assetDao.AssetSku.Update(ctx, &assetDto.UpdateAssetSkuReq{Id: req.AssetSkuId, Stock: req.StockCount}) return err }) if err != nil { return err } if !success { return fmt.Errorf("获取库存操作锁失败: %v", err) } return nil }