Files
assets/service/stock/stock_manage_service.go

319 lines
10 KiB
Go
Raw Permalink Normal View History

2026-03-18 10:18:03 +08:00
// 库存管理服务Stock公共库存
// 职责:入库/出库操作,支持明细模式(StockDetails)和批次模式(StockBatch)
// 调用链Controller → StockOperation → stockPublishMessage → NATS → AddStock(消费者)
// 紧密耦合dao.StockDetails、dao.StockBatch、dao.AssetSku(更新库存数)、common/message(NATS发布)
// 注意:移库/调拨是PrivateStock专属操作不在此实现
package service
import (
"assets/consts/public"
"assets/consts/stock"
assetDao "assets/dao/asset"
dao "assets/dao/stock"
assetDto "assets/model/dto/asset"
stockDto "assets/model/dto/stock"
assetEntity "assets/model/entity/asset"
entity "assets/model/entity/stock"
"context"
"fmt"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
type stockManage struct{}
// StockManage 库存管理服务Stock公共库存
// 职责:
// 1. 入库/出库StockDetails明细模式和 StockBatch批次模式的库存操作
// 2. 库存查询表单字段生成
// 注意移库MoveStock和调拨TransferStock是PrivateStock专属操作不要在这里实现
var StockManage = new(stockManage)
// GetStockFormFields 获取库存操作表单字段
func (s *stockManage) GetStockFormFields(ctx context.Context, req *stockDto.GetStockFormFieldsReq) (*stockDto.GetStockFormFieldsRes, error) {
// 获取资产SKU信息
assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId})
2026-03-18 10:18:03 +08:00
if err != nil {
return nil, err
}
fields := make([]map[string]interface{}, 0)
// Stock 字段在两种模式下都显示
fields = append(fields, map[string]interface{}{
"name": "stock",
"label": "库存数量",
"type": "number",
"required": true,
"min": 1,
})
// 如果是批次模式(2),添加批次相关字段
if !g.IsEmpty(assetSku.StockMode) && assetSku.StockMode == stock.StockModeBatch {
fields = append(fields, []map[string]interface{}{
{
"name": "batchNo",
"label": "批次号",
"type": "number",
"required": true,
"default": gconv.Int(gtime.Now().Format("20060102") + "0001"),
"maxLength": 12,
},
{
"name": "productionDate",
"label": "生产日期",
"type": "date",
},
{
"name": "expiryDate",
"label": "过期日期",
"type": "date",
},
{
"name": "expiryWarningDate",
"label": "临期预警时间",
"type": "date",
},
}...)
}
return &stockDto.GetStockFormFieldsRes{
StockMode: assetSku.StockMode,
Fields: fields,
}, nil
}
// StockOperation 库存操作入口(入库/出库)
// 根据SKU的StockMode区分明细模式和批次模式计算差值后发布消息到NATS
func (s *stockManage) StockOperation(ctx context.Context, req *stockDto.StockOperationReq) (err error) {
assetSku, err := assetDao.AssetSku.GetOne(ctx, &assetDto.GetAssetSkuReq{Id: req.AssetSkuId})
2026-03-18 10:18:03 +08:00
if err != nil {
return
}
if !assetSku.UnlimitedStock && req.Stock >= 0 {
var stockId *bson.ObjectID
count := 0
if assetSku.StockMode == stock.StockModeDetail {
_count, err := dao.StockDetails.GetStockCountBySkuId(ctx, assetSku.Id)
if err != nil {
return err
}
count = gconv.Int(_count)
}
if assetSku.StockMode == stock.StockModeBatch {
if g.IsEmpty(req.BatchNo) {
return gerror.New("批次号不能为空")
}
getOne, err := dao.StockBatch.GetOne(ctx, req.BatchNo)
if err != nil {
return err
}
if !g.IsEmpty(getOne) {
stockId = getOne.Id
count = getOne.BatchQty
}
}
stockCount := 0
operationType := ""
if count != req.Stock {
if count > req.Stock {
stockCount = count - req.Stock
operationType = "del"
} else {
stockCount = req.Stock - count
operationType = "add"
}
}
if !g.IsEmpty(operationType) && stockCount > 0 {
if err = s.stockPublishMessage(ctx, assetSku, stockId, stockCount, operationType, req); err != nil {
return err
}
}
}
return
}
// stockPublishMessage 发布库存变更消息到NATS
// 消费者接收后执行实际的入库/出库操作(异步解耦)
func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEntity.AssetSku, stockId *bson.ObjectID, stockCount int, operationType string, req *stockDto.StockOperationReq) (err error) {
// 用户信息
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
publishMessage := stockDto.StockPublishMessage{
AssetId: assetSku.AssetId,
AssetSkuId: assetSku.Id,
2026-03-18 10:18:03 +08:00
TenantId: user.TenantId,
UserName: user.UserName,
StockCount: stockCount,
OperationType: operationType,
Metadata: gconv.Maps(assetSku.SpecValues),
2026-03-18 10:18:03 +08:00
StockMode: int(assetSku.StockMode),
BatchNo: req.BatchNo,
ProductionDate: req.ProductionDate,
ExpiryDate: req.ExpiryDate,
ExpiryWarningDate: req.ExpiryWarningDate,
}
if !g.IsEmpty(stockId) && !stockId.IsZero() {
publishMessage.StockId = stockId.Hex()
}
// 发布到 NATS
//plugin, err := message.GetMsgPlugin(ctx, message.MessageNATS)
//if err != nil {
// return gerror.Newf("NATS插件未就绪: %v", err)
//}
//err = plugin.Publish(ctx, &message.NatsPublishMsgConfig{
// QueueName: public.StockDetailGroupName,
// Durable: true,
// Data: publishMessage,
//})
//_, err = message.PublishMessage(ctx, &message.RedisMessageConfig{StreamKey: public.StockDetailStreamKey}, publishMessage)
//plugin, err := message.GetMsgPlugin(message.MessageRedis)
//if err != nil {
// return err
//}
//err = plugin.Publish(ctx, &message.RedisPublishMsgConfig{
// QueueName: public.StockDetailQueueName,
// Data: publishMessage,
//})
return
}
// AddStock NATS消费者调用执行实际的入库/出库操作
// 使用Redis分布式锁防止并发冲突支持明细模式和批次模式
func (s *stockManage) AddStock(ctx context.Context, msg map[string]interface{}) error {
assetId := gconv.Int64(msg["assetId"])
assetSkuId := gconv.Int64(msg["assetSkuId"])
stockId := gconv.Int64(msg["stockId"])
2026-03-18 10:18:03 +08:00
userName := gconv.String(msg["userName"])
tenantId := gconv.Float64(msg["tenantId"])
stockCount := gconv.Int(msg["stockCount"])
operationType := gconv.String(msg["operationType"])
metadata := gconv.Maps(msg["metadata"])
stockMode := stock.StockMode(gconv.Int(msg["stockMode"]))
batchNo := gconv.String(msg["batchNo"])
productionDate := gtime.New(msg["productionDate"])
expiryDate := gtime.New(msg["expiryDate"])
expiryWarningDate := gtime.New(msg["expiryWarningDate"])
// 设置 userId 和 tenantId 到 ctx
ctx = context.WithValue(ctx, "userName", userName)
ctx = context.WithValue(ctx, "tenantId", tenantId)
// 获取redis-租户存储-锁key
fileLockKey := fmt.Sprintf(public.StockDetailLockKey, assetSkuId)
success, err := redis.Lock(ctx, fileLockKey, int64(60), func(ctx context.Context) error {
if operationType == "add" {
if stockMode == stock.StockModeBatch {
if !g.IsEmpty(stockId) {
2026-03-18 10:18:03 +08:00
batch := stockDto.UpdateBatchReq{
Id: stockId,
2026-03-18 10:18:03 +08:00
BatchQty: stockCount,
AvailableQty: stockCount,
}
if err := dao.StockBatch.Update(ctx, &batch); err != nil {
return err
}
} else {
batch := stockDto.CreateBatchReq{
AssetId: assetId,
AssetSkuId: assetSkuId,
2026-03-18 10:18:03 +08:00
Status: stock.BatchStatusActive,
Metadata: metadata,
BatchNo: batchNo,
BatchQty: stockCount,
AvailableQty: stockCount,
ProductionDate: productionDate,
ExpiryDate: expiryDate,
ExpiryWarningDate: expiryWarningDate,
}
if _, err := dao.StockBatch.Insert(ctx, &batch); err != nil {
return err
}
}
}
if stockMode == stock.StockModeDetail {
// 创建指定数量的库存
var stockInterfaces []interface{}
for i := 0; i < stockCount; i++ {
stockInterfaces = append(stockInterfaces, entity.StockDetails{
AssetId: assetId,
AssetSkuId: assetSkuId,
2026-03-18 10:18:03 +08:00
Status: stock.StockStatusAvailable,
Metadata: metadata,
})
}
// 批量插入数据库
if _, err := dao.StockDetails.BatchInsert(ctx, stockInterfaces); err != nil {
2026-03-18 10:18:03 +08:00
return err
}
}
}
if operationType == "del" {
if stockMode == stock.StockModeBatch {
stockCount = 0 - stockCount
// 更新批次
batch := stockDto.UpdateBatchReq{
Id: stockId,
2026-03-18 10:18:03 +08:00
BatchQty: stockCount,
AvailableQty: stockCount,
}
if err := dao.StockBatch.Update(ctx, &batch); err != nil {
return err
}
}
if stockMode == stock.StockModeDetail {
// 分页查询所有库存明细收集所有ID
var allStockIds []*bson.ObjectID
pageSize := int64(50)
for pageNum := int64(1); ; pageNum++ {
details, total, err := dao.StockDetails.List(ctx,
&stockDto.ListStockDetailsReq{
AssetSkuId: assetSkuId,
2026-03-18 10:18:03 +08:00
Status: stock.StockStatusAvailable,
Page: &beans.Page{PageNum: pageNum, PageSize: pageSize},
})
if err != nil {
return err
}
if pageNum == 1 && int(total) < stockCount {
return gerror.New("可操作库存数量不足")
}
// 收集当前页的ID
for _, detail := range details {
if detail.Id != nil && !detail.Id.IsZero() {
allStockIds = append(allStockIds, detail.Id)
if len(allStockIds) >= stockCount {
break
}
}
}
if len(allStockIds) >= stockCount {
break
}
}
// 根据ID批量删除库存
delCount, err := dao.StockDetails.DeleteManyByIds(ctx, allStockIds)
if err != nil {
return err
}
if delCount != int64(stockCount) {
return gerror.New("删除库存数量不匹配")
}
stockCount = 0 - stockCount
}
}
_, err := assetDao.AssetSku.Update(ctx, &assetDto.UpdateAssetSkuReq{Id: assetSkuId, Stock: stockCount})
return err
2026-03-18 10:18:03 +08:00
})
if err != nil {
return err
}
if !success {
return fmt.Errorf("获取库存操作锁失败: %v", err)
}
return nil
}