296 lines
9.1 KiB
Go
296 lines
9.1 KiB
Go
// Package service 库存管理服务(Stock公共库存)
|
||
// 职责:入库/出库操作,支持明细模式(StockDetails)和批次模式(StockBatch)
|
||
// 调用链:Controller → StockOperation → stockPublishMessage → NATS → AddStock(消费者)
|
||
// 紧密耦合:dao.StockDetails、dao.StockBatch、dao.AssetSku(更新库存数)、common/message(NATS发布)
|
||
// 注意:移库/调拨是PrivateStock专属操作,不在此实现
|
||
package service
|
||
|
||
import (
|
||
"assets/consts/public"
|
||
"assets/consts/stock"
|
||
assetDao "assets/dao/asset"
|
||
dao "assets/dao/stock"
|
||
assetDto "assets/model/dto/asset"
|
||
stockDto "assets/model/dto/stock"
|
||
assetEntity "assets/model/entity/asset"
|
||
"context"
|
||
"fmt"
|
||
|
||
"gitea.com/red-future/common/beans"
|
||
"gitea.com/red-future/common/utils"
|
||
gmq "github.com/bjang03/gmq/core/gmq"
|
||
"github.com/bjang03/gmq/mq"
|
||
"github.com/bjang03/gmq/types"
|
||
"github.com/gogf/gf/v2/errors/gerror"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/gtime"
|
||
"github.com/gogf/gf/v2/util/gconv"
|
||
)
|
||
|
||
type stockManage struct{}
|
||
|
||
// StockManage 库存管理服务(Stock公共库存)
|
||
// 职责:
|
||
// 1. 入库/出库:StockDetails(明细模式)和 StockBatch(批次模式)的库存操作
|
||
// 2. 库存查询表单字段生成
|
||
// 注意:移库(MoveStock)和调拨(TransferStock)是PrivateStock专属操作,不要在这里实现
|
||
var StockManage = new(stockManage)
|
||
|
||
// GetStockFormFields 获取库存操作表单字段
|
||
func (s *stockManage) GetStockFormFields(ctx context.Context, req *stockDto.GetStockFormFieldsReq) (*stockDto.GetStockFormFieldsRes, error) {
|
||
// 获取资产SKU信息
|
||
assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
fields := make([]map[string]interface{}, 0)
|
||
// Stock 字段在两种模式下都显示
|
||
fields = append(fields, map[string]interface{}{
|
||
"name": "stock",
|
||
"label": "库存数量",
|
||
"type": "number",
|
||
"required": true,
|
||
"min": 1,
|
||
})
|
||
// 如果是批次模式(2),添加批次相关字段
|
||
if !g.IsEmpty(assetSku.StockMode) && assetSku.StockMode == stock.StockModeBatch {
|
||
fields = append(fields, []map[string]interface{}{
|
||
{
|
||
"name": "batchNo",
|
||
"label": "批次号",
|
||
"type": "number",
|
||
"required": true,
|
||
"default": gconv.Int(gtime.Now().Format("20060102") + "0001"),
|
||
"maxLength": 12,
|
||
},
|
||
{
|
||
"name": "productionDate",
|
||
"label": "生产日期",
|
||
"type": "date",
|
||
},
|
||
{
|
||
"name": "expiryDate",
|
||
"label": "过期日期",
|
||
"type": "date",
|
||
},
|
||
{
|
||
"name": "expiryWarningDate",
|
||
"label": "临期预警时间",
|
||
"type": "date",
|
||
},
|
||
}...)
|
||
}
|
||
return &stockDto.GetStockFormFieldsRes{
|
||
StockMode: assetSku.StockMode,
|
||
Fields: fields,
|
||
}, nil
|
||
}
|
||
|
||
// StockOperation 库存操作入口(入库/出库)
|
||
// 根据SKU的StockMode区分明细模式和批次模式,计算差值后发布消息到NATS
|
||
func (s *stockManage) StockOperation(ctx context.Context, req *stockDto.StockOperationReq) (err error) {
|
||
// TODO 后续需要排一下查询应该不走缓存,否则会导致库存不准确
|
||
assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId})
|
||
if err != nil {
|
||
return
|
||
}
|
||
if !assetSku.UnlimitedStock && req.Stock >= 0 {
|
||
var stockId int64
|
||
count := 0
|
||
if assetSku.StockMode == stock.StockModeDetail {
|
||
detailsReq := &stockDto.GetSockDetailsReq{
|
||
AssetSkuId: assetSku.Id,
|
||
Status: stock.StockStatusAvailable.Code(),
|
||
}
|
||
count, err = dao.StockDetails.Count(ctx, detailsReq)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if assetSku.StockMode == stock.StockModeBatch {
|
||
if g.IsEmpty(req.BatchNo) {
|
||
return gerror.New("批次号不能为空")
|
||
}
|
||
getOne, err := dao.StockBatch.One(ctx, &stockDto.GetSockBatchReq{
|
||
BatchNo: req.BatchNo,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !g.IsEmpty(getOne) {
|
||
stockId = getOne.Id
|
||
count = getOne.BatchQty
|
||
}
|
||
}
|
||
stockCount := 0
|
||
operationType := ""
|
||
if count != req.Stock {
|
||
if count > req.Stock {
|
||
stockCount = count - req.Stock
|
||
operationType = "del"
|
||
} else {
|
||
stockCount = req.Stock - count
|
||
operationType = "add"
|
||
}
|
||
}
|
||
if !g.IsEmpty(operationType) && stockCount > 0 {
|
||
if err = s.stockPublishMessage(ctx, assetSku, stockId, stockCount, operationType, req); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
// stockPublishMessage 发布库存变更消息到NATS
|
||
// 消费者接收后执行实际的入库/出库操作(异步解耦)
|
||
func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEntity.AssetSku, stockId int64, stockCount int, operationType string, req *stockDto.StockOperationReq) (err error) {
|
||
// 用户信息
|
||
user, err := utils.GetUserInfo(ctx)
|
||
if err != nil {
|
||
return
|
||
}
|
||
publishMessage := stockDto.StockPublishMessage{
|
||
AssetId: assetSku.AssetId,
|
||
AssetSkuId: assetSku.Id,
|
||
TenantId: user.TenantId,
|
||
UserName: user.UserName,
|
||
StockCount: stockCount,
|
||
OperationType: operationType,
|
||
Metadata: gconv.Maps(assetSku.SpecValues),
|
||
StockMode: assetSku.StockMode,
|
||
BatchNo: req.BatchNo,
|
||
ProductionDate: req.ProductionDate,
|
||
ExpiryDate: req.ExpiryDate,
|
||
ExpiryWarningDate: req.ExpiryWarningDate,
|
||
}
|
||
if !g.IsEmpty(stockId) {
|
||
publishMessage.StockId = stockId
|
||
}
|
||
err = gmq.GetGmq("primary").GmqPublish(ctx, &mq.RedisPubMessage{
|
||
PubMessage: types.PubMessage{
|
||
Topic: public.StockDetailQueueName,
|
||
Data: publishMessage,
|
||
},
|
||
})
|
||
return
|
||
}
|
||
|
||
// AddStock NATS消费者调用,执行实际的入库/出库操作
|
||
// 使用Redis分布式锁防止并发冲突,支持明细模式和批次模式
|
||
func (s *stockManage) AddStock(ctx context.Context, msg any) error {
|
||
var req = new(stockDto.StockPublishMessage)
|
||
msgMap := gconv.Map(msg)
|
||
if err := gconv.Struct(msgMap["data"], &req); err != nil {
|
||
return err
|
||
}
|
||
// 设置 userId 和 tenantId 到 ctx
|
||
ctx = context.WithValue(ctx, "user", &beans.User{
|
||
UserName: req.UserName,
|
||
TenantId: req.TenantId,
|
||
})
|
||
// 获取redis-租户存储-锁key
|
||
fileLockKey := fmt.Sprintf(public.StockDetailLockKey, req.AssetSkuId)
|
||
success, err := utils.Lock(ctx, fileLockKey, int64(60), func(ctx context.Context) error {
|
||
if req.OperationType == "add" {
|
||
if req.StockMode == stock.StockModeBatch {
|
||
if !g.IsEmpty(req.StockId) {
|
||
var updateReq = new(stockDto.UpdateSockBatchReq)
|
||
if err := gconv.Struct(req, &updateReq); err != nil {
|
||
return err
|
||
}
|
||
if _, err := dao.StockBatch.Update(ctx, updateReq); err != nil {
|
||
return err
|
||
}
|
||
} else {
|
||
var createReq = new(stockDto.CreateSockBatchReq)
|
||
if err := gconv.Struct(req, &createReq); err != nil {
|
||
return err
|
||
}
|
||
if _, err := dao.StockBatch.Insert(ctx, createReq); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
if req.StockMode == stock.StockModeDetail {
|
||
// 创建指定数量的库存
|
||
stockDetailsList := make([]*stockDto.CreateSockDetailsReq, req.StockCount)
|
||
for i := 0; i < req.StockCount; i++ {
|
||
var createReq = new(stockDto.CreateSockDetailsReq)
|
||
if err := gconv.Struct(req, &createReq); err != nil {
|
||
return err
|
||
}
|
||
stockDetailsList = append(stockDetailsList, createReq)
|
||
}
|
||
// 批量插入数据库
|
||
if _, err := dao.StockDetails.BatchInsert(ctx, stockDetailsList); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
if req.OperationType == "del" {
|
||
if req.StockMode == stock.StockModeBatch {
|
||
req.StockCount = 0 - req.StockCount
|
||
// 更新批次
|
||
batch := stockDto.UpdateSockBatchReq{
|
||
Id: req.StockId,
|
||
BatchQty: req.StockCount,
|
||
AvailableQty: req.StockCount,
|
||
}
|
||
if _, err := dao.StockBatch.Update(ctx, &batch); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
if req.StockMode == stock.StockModeDetail {
|
||
// 分页查询所有库存明细,收集所有ID
|
||
var allStockIds []stockDto.DeleteSockDetailsReq
|
||
pageSize := int64(50)
|
||
for pageNum := int64(1); ; pageNum++ {
|
||
details, total, err := dao.StockDetails.List(ctx,
|
||
&stockDto.GetSockDetailsReq{
|
||
AssetSkuId: req.AssetSkuId,
|
||
Status: stock.StockStatusAvailable.Code(),
|
||
Page: &beans.Page{PageNum: pageNum, PageSize: pageSize},
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if pageNum == 1 && total < req.StockCount {
|
||
return gerror.New("可操作库存数量不足")
|
||
}
|
||
// 收集当前页的ID
|
||
for _, detail := range details {
|
||
if !g.IsEmpty(detail.Id) {
|
||
allStockIds = append(allStockIds, stockDto.DeleteSockDetailsReq{Id: detail.Id})
|
||
if len(allStockIds) >= req.StockCount {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
if len(allStockIds) >= req.StockCount {
|
||
break
|
||
}
|
||
}
|
||
// 根据ID批量删除库存
|
||
delCount, err := dao.StockDetails.Delete(ctx, allStockIds)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if delCount != int64(req.StockCount) {
|
||
return gerror.New("删除库存数量不匹配")
|
||
}
|
||
req.StockCount = 0 - req.StockCount
|
||
}
|
||
}
|
||
_, err := assetDao.AssetSku.Update(ctx, &assetDto.UpdateAssetSkuReq{Id: req.AssetSkuId, Stock: req.StockCount})
|
||
return err
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !success {
|
||
return fmt.Errorf("获取库存操作锁失败: %v", err)
|
||
}
|
||
return nil
|
||
}
|