refactor: 重构资产实体和DTO结构类型

将gjson.Json类型替换为具体的结构体和map类型,修正DAO层链式调用,启用SKU元数据校验逻辑
This commit is contained in:
2026-03-22 20:08:32 +08:00
parent 34a1ba79b6
commit 829dc07747
36 changed files with 932 additions and 793 deletions

View File

@@ -4,57 +4,48 @@
// 注意区别于PrivateStock的实物库存批次库存是逻辑概念不记录物理位置
package service
import (
dao "assets/dao/stock"
dto "assets/model/dto/stock"
"context"
"gitea.com/red-future/common/utils"
"go.mongodb.org/mongo-driver/v2/bson"
)
type stockBatch struct{}
// StockBatch 批次服务
var StockBatch = new(stockBatch)
func (s *stockBatch) Create(ctx context.Context, req *dto.CreateBatchReq) (res *dto.CreateBatchRes, err error) {
ids, err := dao.StockBatch.Insert(ctx, req)
if err != nil {
return
}
id := ids[0].(bson.ObjectID)
res = &dto.CreateBatchRes{
Id: &id,
}
return
}
func (s *stockBatch) Update(ctx context.Context, req *dto.UpdateBatchReq) error {
return dao.StockBatch.Update(ctx, req)
}
func (s *stockBatch) Delete(ctx context.Context, req *dto.DeleteBatchReq) error {
return dao.StockBatch.DeleteFake(ctx, req)
}
func (s *stockBatch) GetOne(ctx context.Context, req *dto.GetBatchReq) (res *dto.GetBatchRes, err error) {
one, err := dao.StockBatch.GetOneById(ctx, req)
if err != nil {
return
}
err = utils.Struct(one, &res)
return
}
func (s *stockBatch) List(ctx context.Context, req *dto.ListBatchReq) (res *dto.ListBatchRes, err error) {
list, total, err := dao.StockBatch.List(ctx, req)
if err != nil {
return
}
res = &dto.ListBatchRes{
Total: total,
}
err = utils.Struct(list, &res.List)
return
}
//func (s *stockBatch) Create(ctx context.Context, req *dto.CreateBatchReq) (res *dto.CreateBatchRes, err error) {
// ids, err := dao.StockBatch.Insert(ctx, req)
// if err != nil {
// return
// }
// id := ids[0].(bson.ObjectID)
// res = &dto.CreateBatchRes{
// Id: &id,
// }
// return
//}
//
//func (s *stockBatch) Update(ctx context.Context, req *dto.UpdateBatchReq) error {
// return dao.StockBatch.Update(ctx, req)
//}
//
//func (s *stockBatch) Delete(ctx context.Context, req *dto.DeleteBatchReq) error {
// return dao.StockBatch.Delete(ctx, req)
//}
//
//func (s *stockBatch) GetOne(ctx context.Context, req *dto.GetBatchReq) (res *dto.GetBatchRes, err error) {
// one, err := dao.StockBatch.GetOneById(ctx, req)
// if err != nil {
// return
// }
// err = utils.Struct(one, &res)
// return
//}
//
//func (s *stockBatch) List(ctx context.Context, req *dto.ListBatchReq) (res *dto.ListBatchRes, err error) {
// list, total, err := dao.StockBatch.List(ctx, req)
// if err != nil {
// return
// }
// res = &dto.ListBatchRes{
// Total: total,
// }
// err = utils.Struct(list, &res.List)
// return
//}

View File

@@ -4,36 +4,28 @@
// 注意区别于PrivateStock的实物库存明细库存是逻辑概念不记录物理位置
package service
import (
dao "assets/dao/stock"
dto "assets/model/dto/stock"
"context"
"gitea.com/red-future/common/utils"
)
type stockDetails struct{}
// StockDetails 库存服务
var StockDetails = new(stockDetails)
func (s *stockDetails) GetOne(ctx context.Context, req *dto.GetStockDetailsReq) (res *dto.GetStockDetailsRes, err error) {
one, err := dao.StockDetails.GetOneById(ctx, req)
if err != nil {
return
}
err = utils.Struct(one, &res)
return
}
func (s *stockDetails) List(ctx context.Context, req *dto.ListStockDetailsReq) (res *dto.ListStockDetailsRes, err error) {
list, total, err := dao.StockDetails.List(ctx, req)
if err != nil {
return
}
res = &dto.ListStockDetailsRes{
Total: total,
}
err = utils.Struct(list, &res.List)
return
}
//func (s *stockDetails) GetOne(ctx context.Context, req *dto.GetStockDetailsReq) (res *dto.GetStockDetailsRes, err error) {
// one, err := dao.StockDetails.GetOne(ctx, req)
// if err != nil {
// return
// }
// err = utils.Struct(one, &res)
// return
//}
//
//func (s *stockDetails) List(ctx context.Context, req *dto.ListStockDetailsReq) (res *dto.ListStockDetailsRes, err error) {
// list, total, err := dao.StockDetails.List(ctx, req)
// if err != nil {
// return
// }
// res = &dto.ListStockDetailsRes{
// Total: total,
// }
// err = utils.Struct(list, &res.List)
// return
//}

View File

@@ -1,4 +1,4 @@
// 库存管理服务Stock公共库存
// Package service 库存管理服务Stock公共库存
// 职责:入库/出库操作,支持明细模式(StockDetails)和批次模式(StockBatch)
// 调用链Controller → StockOperation → stockPublishMessage → NATS → AddStock(消费者)
// 紧密耦合dao.StockDetails、dao.StockBatch、dao.AssetSku(更新库存数)、common/message(NATS发布)
@@ -13,18 +13,19 @@ import (
assetDto "assets/model/dto/asset"
stockDto "assets/model/dto/stock"
assetEntity "assets/model/entity/asset"
entity "assets/model/entity/stock"
"context"
"fmt"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
gmq "github.com/bjang03/gmq/core/gmq"
"github.com/bjang03/gmq/mq"
"github.com/bjang03/gmq/types"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
type stockManage struct{}
@@ -94,20 +95,25 @@ func (s *stockManage) StockOperation(ctx context.Context, req *stockDto.StockOpe
return
}
if !assetSku.UnlimitedStock && req.Stock >= 0 {
var stockId *bson.ObjectID
var stockId int64
count := 0
if assetSku.StockMode == stock.StockModeDetail {
_count, err := dao.StockDetails.GetStockCountBySkuId(ctx, assetSku.Id)
detailsReq := &stockDto.GetSockDetailsReq{
AssetSkuId: assetSku.Id,
Status: stock.StockStatusAvailable.Code(),
}
count, err = dao.StockDetails.Count(ctx, detailsReq)
if err != nil {
return err
}
count = gconv.Int(_count)
}
if assetSku.StockMode == stock.StockModeBatch {
if g.IsEmpty(req.BatchNo) {
return gerror.New("批次号不能为空")
}
getOne, err := dao.StockBatch.GetOne(ctx, req.BatchNo)
getOne, err := dao.StockBatch.One(ctx, &stockDto.GetSockBatchReq{
BatchNo: req.BatchNo,
})
if err != nil {
return err
}
@@ -138,7 +144,7 @@ func (s *stockManage) StockOperation(ctx context.Context, req *stockDto.StockOpe
// stockPublishMessage 发布库存变更消息到NATS
// 消费者接收后执行实际的入库/出库操作(异步解耦)
func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEntity.AssetSku, stockId *bson.ObjectID, stockCount int, operationType string, req *stockDto.StockOperationReq) (err error) {
func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEntity.AssetSku, stockId int64, stockCount int, operationType string, req *stockDto.StockOperationReq) (err error) {
// 用户信息
user, err := utils.GetUserInfo(ctx)
if err != nil {
@@ -152,160 +158,128 @@ func (s *stockManage) stockPublishMessage(ctx context.Context, assetSku *assetEn
StockCount: stockCount,
OperationType: operationType,
Metadata: gconv.Maps(assetSku.SpecValues),
StockMode: int(assetSku.StockMode),
StockMode: assetSku.StockMode,
BatchNo: req.BatchNo,
ProductionDate: req.ProductionDate,
ExpiryDate: req.ExpiryDate,
ExpiryWarningDate: req.ExpiryWarningDate,
}
if !g.IsEmpty(stockId) && !stockId.IsZero() {
publishMessage.StockId = stockId.Hex()
if !g.IsEmpty(stockId) {
publishMessage.StockId = stockId
}
// 发布到 NATS
//plugin, err := message.GetMsgPlugin(ctx, message.MessageNATS)
//if err != nil {
// return gerror.Newf("NATS插件未就绪: %v", err)
//}
//err = plugin.Publish(ctx, &message.NatsPublishMsgConfig{
// QueueName: public.StockDetailGroupName,
// Durable: true,
// Data: publishMessage,
//})
//_, err = message.PublishMessage(ctx, &message.RedisMessageConfig{StreamKey: public.StockDetailStreamKey}, publishMessage)
//plugin, err := message.GetMsgPlugin(message.MessageRedis)
//if err != nil {
// return err
//}
//err = plugin.Publish(ctx, &message.RedisPublishMsgConfig{
// QueueName: public.StockDetailQueueName,
// Data: publishMessage,
//})
err = gmq.GetGmq("primary").GmqPublish(ctx, &mq.RedisPubMessage{
PubMessage: types.PubMessage{
Topic: public.StockDetailQueueName,
Data: publishMessage,
},
})
return
}
// AddStock NATS消费者调用执行实际的入库/出库操作
// 使用Redis分布式锁防止并发冲突支持明细模式和批次模式
func (s *stockManage) AddStock(ctx context.Context, msg map[string]interface{}) error {
assetId := gconv.Int64(msg["assetId"])
assetSkuId := gconv.Int64(msg["assetSkuId"])
stockId := gconv.Int64(msg["stockId"])
userName := gconv.String(msg["userName"])
tenantId := gconv.Float64(msg["tenantId"])
stockCount := gconv.Int(msg["stockCount"])
operationType := gconv.String(msg["operationType"])
metadata := gconv.Maps(msg["metadata"])
stockMode := stock.StockMode(gconv.Int(msg["stockMode"]))
batchNo := gconv.String(msg["batchNo"])
productionDate := gtime.New(msg["productionDate"])
expiryDate := gtime.New(msg["expiryDate"])
expiryWarningDate := gtime.New(msg["expiryWarningDate"])
func (s *stockManage) AddStock(ctx context.Context, msg any) error {
var req = new(stockDto.StockPublishMessage)
if err := gconv.Struct(msg, &req); err != nil {
return err
}
// 设置 userId 和 tenantId 到 ctx
ctx = context.WithValue(ctx, "userName", userName)
ctx = context.WithValue(ctx, "tenantId", tenantId)
ctx = context.WithValue(ctx, "userName", req.UserName)
ctx = context.WithValue(ctx, "tenantId", req.TenantId)
// 获取redis-租户存储-锁key
fileLockKey := fmt.Sprintf(public.StockDetailLockKey, assetSkuId)
fileLockKey := fmt.Sprintf(public.StockDetailLockKey, req.AssetSkuId)
success, err := redis.Lock(ctx, fileLockKey, int64(60), func(ctx context.Context) error {
if operationType == "add" {
if stockMode == stock.StockModeBatch {
if !g.IsEmpty(stockId) {
batch := stockDto.UpdateBatchReq{
Id: stockId,
BatchQty: stockCount,
AvailableQty: stockCount,
if req.OperationType == "add" {
if req.StockMode == stock.StockModeBatch {
if !g.IsEmpty(req.StockId) {
var updateReq = new(stockDto.UpdateSockBatchReq)
if err := gconv.Struct(req, &updateReq); err != nil {
return err
}
if err := dao.StockBatch.Update(ctx, &batch); err != nil {
if _, err := dao.StockBatch.Update(ctx, updateReq); err != nil {
return err
}
} else {
batch := stockDto.CreateBatchReq{
AssetId: assetId,
AssetSkuId: assetSkuId,
Status: stock.BatchStatusActive,
Metadata: metadata,
BatchNo: batchNo,
BatchQty: stockCount,
AvailableQty: stockCount,
ProductionDate: productionDate,
ExpiryDate: expiryDate,
ExpiryWarningDate: expiryWarningDate,
var createReq = new(stockDto.CreateSockBatchReq)
if err := gconv.Struct(req, &createReq); err != nil {
return err
}
if _, err := dao.StockBatch.Insert(ctx, &batch); err != nil {
if _, err := dao.StockBatch.Insert(ctx, createReq); err != nil {
return err
}
}
}
if stockMode == stock.StockModeDetail {
if req.StockMode == stock.StockModeDetail {
// 创建指定数量的库存
var stockInterfaces []interface{}
for i := 0; i < stockCount; i++ {
stockInterfaces = append(stockInterfaces, entity.StockDetails{
AssetId: assetId,
AssetSkuId: assetSkuId,
Status: stock.StockStatusAvailable,
Metadata: metadata,
})
stockDetailsList := make([]*stockDto.CreateSockDetailsReq, req.StockCount)
for i := 0; i < req.StockCount; i++ {
var createReq = new(stockDto.CreateSockDetailsReq)
if err := gconv.Struct(req, &createReq); err != nil {
return err
}
stockDetailsList = append(stockDetailsList, createReq)
}
// 批量插入数据库
if _, err := dao.StockDetails.BatchInsert(ctx, stockInterfaces); err != nil {
if _, err := dao.StockDetails.BatchInsert(ctx, stockDetailsList); err != nil {
return err
}
}
}
if operationType == "del" {
if stockMode == stock.StockModeBatch {
stockCount = 0 - stockCount
if req.OperationType == "del" {
if req.StockMode == stock.StockModeBatch {
req.StockCount = 0 - req.StockCount
// 更新批次
batch := stockDto.UpdateBatchReq{
Id: stockId,
BatchQty: stockCount,
AvailableQty: stockCount,
batch := stockDto.UpdateSockBatchReq{
Id: req.StockId,
BatchQty: req.StockCount,
AvailableQty: req.StockCount,
}
if err := dao.StockBatch.Update(ctx, &batch); err != nil {
if _, err := dao.StockBatch.Update(ctx, &batch); err != nil {
return err
}
}
if stockMode == stock.StockModeDetail {
if req.StockMode == stock.StockModeDetail {
// 分页查询所有库存明细收集所有ID
var allStockIds []*bson.ObjectID
var allStockIds []stockDto.DeleteSockDetailsReq
pageSize := int64(50)
for pageNum := int64(1); ; pageNum++ {
details, total, err := dao.StockDetails.List(ctx,
&stockDto.ListStockDetailsReq{
AssetSkuId: assetSkuId,
Status: stock.StockStatusAvailable,
&stockDto.GetSockDetailsReq{
AssetSkuId: req.AssetSkuId,
Status: stock.StockStatusAvailable.Code(),
Page: &beans.Page{PageNum: pageNum, PageSize: pageSize},
})
if err != nil {
return err
}
if pageNum == 1 && int(total) < stockCount {
if pageNum == 1 && total < req.StockCount {
return gerror.New("可操作库存数量不足")
}
// 收集当前页的ID
for _, detail := range details {
if detail.Id != nil && !detail.Id.IsZero() {
allStockIds = append(allStockIds, detail.Id)
if len(allStockIds) >= stockCount {
if !g.IsEmpty(detail.Id) {
allStockIds = append(allStockIds, stockDto.DeleteSockDetailsReq{Id: detail.Id})
if len(allStockIds) >= req.StockCount {
break
}
}
}
if len(allStockIds) >= stockCount {
if len(allStockIds) >= req.StockCount {
break
}
}
// 根据ID批量删除库存
delCount, err := dao.StockDetails.DeleteManyByIds(ctx, allStockIds)
delCount, err := dao.StockDetails.Delete(ctx, allStockIds)
if err != nil {
return err
}
if delCount != int64(stockCount) {
if delCount != int64(req.StockCount) {
return gerror.New("删除库存数量不匹配")
}
stockCount = 0 - stockCount
req.StockCount = 0 - req.StockCount
}
}
_, err := assetDao.AssetSku.Update(ctx, &assetDto.UpdateAssetSkuReq{Id: assetSkuId, Stock: stockCount})
_, err := assetDao.AssetSku.Update(ctx, &assetDto.UpdateAssetSkuReq{Id: req.AssetSkuId, Stock: req.StockCount})
return err
})
if err != nil {