Browse Source

update

master
wanggang 1 year ago
parent
commit
e81338d264
  1. 30
      code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs
  2. 127
      code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs
  3. 3
      code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiBalance.cs
  4. 3
      code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiLog.cs
  5. 3
      code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiMessage.cs

30
code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs

@ -89,7 +89,7 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
{ {
using var ms = new MemoryStream(); using var ms = new MemoryStream();
await files.FirstOrDefault().OpenReadStream().CopyToAsync(ms).ConfigureAwait(false); await files.FirstOrDefault().OpenReadStream().CopyToAsync(ms).ConfigureAwait(false);
return this.ImportInternal<PUB_ADJ_DETAIL_IMP_DTO>(ms.ToArray(), out var validationResults); return this.ImportInternal<PUB_ADJ_DETAIL_IMP_DTO>(ms.ToArray()).Select(o => o.Item1).ToList();
} }
/// <summary> /// <summary>
@ -388,8 +388,8 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
var file = files.FirstOrDefault(); var file = files.FirstOrDefault();
await file.OpenReadStream().CopyToAsync(ms).ConfigureAwait(false); await file.OpenReadStream().CopyToAsync(ms).ConfigureAwait(false);
var data = ms.ToArray(); var data = ms.ToArray();
var list = this.ImportInternal<VmiLog>(data, out var validationResults); var tupleList = this.ImportInternal<VmiLog>(data).Where(o => o.Item2.Count == 0).ToList();
if (validationResults.Any(o => o.Count > 0)) if (tupleList.Any(o => o.Item2.Count > 0))
{ {
using var workbook = new XLWorkbook(new MemoryStream(data)); using var workbook = new XLWorkbook(new MemoryStream(data));
var ws = workbook.Worksheets.FirstOrDefault(); var ws = workbook.Worksheets.FirstOrDefault();
@ -398,7 +398,7 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
header.Cell(errorIndex).Value = "提示信息"; header.Cell(errorIndex).Value = "提示信息";
for (int i = 0; i < ws.RowsUsed().Count() - 1; i++) for (int i = 0; i < ws.RowsUsed().Count() - 1; i++)
{ {
ws.Row(i + 2).Cell(errorIndex).Value = string.Join(',', validationResults[i].Select(o => o.ErrorMessage)); ws.Row(i + 2).Cell(errorIndex).Value = string.Join(',', tupleList[i].Item2.Select(o => o.ErrorMessage));
} }
SetStyle(ws); SetStyle(ws);
using var stream = new MemoryStream(); using var stream = new MemoryStream();
@ -408,8 +408,9 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
await this._fileContainer.SaveAsync(fileName, stream, true).ConfigureAwait(false); await this._fileContainer.SaveAsync(fileName, stream, true).ConfigureAwait(false);
return new JsonResult(new { code = 400, message = "输入异常", fileName }); return new JsonResult(new { code = 400, message = "输入异常", fileName });
} }
list.ForEach(Update); var logList = tupleList.Select(o => o.Item1).ToList();
var messageList = list.Select(log => new VmiMessage { Message = JsonSerializer.Serialize(log) }).ToList(); logList.AsParallel().ForEach(Update);
var messageList = logList.Select(log => new VmiMessage { Message = JsonSerializer.Serialize(log) }).ToList();
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connectionString).Options; var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connectionString).Options;
using var context = new SettleAccountDbContext(options); using var context = new SettleAccountDbContext(options);
@ -418,7 +419,7 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
try try
{ {
using var transaction = context.Database.BeginTransaction(); using var transaction = context.Database.BeginTransaction();
await context.BulkInsertAsync(list).ConfigureAwait(false); await context.BulkInsertAsync(logList).ConfigureAwait(false);
await context.BulkInsertAsync(messageList).ConfigureAwait(false); await context.BulkInsertAsync(messageList).ConfigureAwait(false);
transaction.Commit(); transaction.Commit();
} }
@ -441,10 +442,9 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
} }
} }
private List<T> ImportInternal<T>(byte[] data, out List<List<ValidationResult>> validationResults) private List<Tuple<T, List<ValidationResult>>> ImportInternal<T>(byte[] data)
{ {
var list = new List<T>(); var list = new List<Tuple<T, List<ValidationResult>>>();
validationResults = new List<List<ValidationResult>>();
try try
{ {
using var workbook = new XLWorkbook(new MemoryStream(data)); using var workbook = new XLWorkbook(new MemoryStream(data));
@ -452,12 +452,13 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
.Where(o => o.GetAttributes<ImporterHeaderAttribute>().Any() || o.GetAttributes<DisplayAttribute>().Any()) .Where(o => o.GetAttributes<ImporterHeaderAttribute>().Any() || o.GetAttributes<DisplayAttribute>().Any())
.ToDictionary(o => o.GetAttribute<ImporterHeaderAttribute>()?.Name ?? o.GetAttribute<DisplayAttribute>()?.Name, o => o); .ToDictionary(o => o.GetAttribute<ImporterHeaderAttribute>()?.Name ?? o.GetAttribute<DisplayAttribute>()?.Name, o => o);
var ws = workbook.Worksheets.FirstOrDefault(); var ws = workbook.Worksheets.FirstOrDefault();
var rowsUsedCount = ws.RowsUsed().Count();
for (int rowIndex = 2; rowIndex <= ws.RowsUsed().Count(); rowIndex++) var columnsUsedCount = ws.ColumnsUsed().Count();
for (int rowIndex = 2; rowIndex <= rowsUsedCount; rowIndex++)
{ {
var row = ws.Row(rowIndex); var row = ws.Row(rowIndex);
var model = Activator.CreateInstance<T>(); var model = Activator.CreateInstance<T>();
for (var columnIndex = 1; columnIndex < ws.ColumnsUsed().Count(); columnIndex++) for (var columnIndex = 1; columnIndex < columnsUsedCount; columnIndex++)
{ {
var cell = row.Cell(columnIndex); var cell = row.Cell(columnIndex);
var headerName = ws.Cell(1, columnIndex).Value.ToString().Trim(); var headerName = ws.Cell(1, columnIndex).Value.ToString().Trim();
@ -498,10 +499,9 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran
} }
} }
} }
list.Add(model);
var results = new List<ValidationResult>(); var results = new List<ValidationResult>();
Validator.TryValidateObject(model, new ValidationContext(model, null, null), results); Validator.TryValidateObject(model, new ValidationContext(model, null, null), results);
validationResults.Add(results); list.Add(new Tuple<T, List<ValidationResult>>(model, results));
} }
return list; return list;
} }

127
code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs

@ -1,11 +1,12 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data.SqlClient;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Linq.Dynamic.Core; using System.Linq.Dynamic.Core;
using System.Text.Json; using System.Text.Json;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using EFCore.BulkExtensions;
using Magicodes.ExporterAndImporter.Core.Extension; using Magicodes.ExporterAndImporter.Core.Extension;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
@ -29,6 +30,7 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
{ {
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly ILogger<VmiAsyncBalanceService> _logger; private readonly ILogger<VmiAsyncBalanceService> _logger;
private static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1);
public VmiAsyncBalanceService(IServiceProvider serviceProvider, ILogger<VmiAsyncBalanceService> logger) public VmiAsyncBalanceService(IServiceProvider serviceProvider, ILogger<VmiAsyncBalanceService> logger)
{ {
@ -50,15 +52,30 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
[NonAction] [NonAction]
public async Task Invoke(IServiceProvider serviceProvider) public async Task Invoke(IServiceProvider serviceProvider)
{
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
await InvokeInternal(serviceProvider).ConfigureAwait(false);
}
finally
{
semaphoreSlim.Release();
}
}
private async Task InvokeInternal(IServiceProvider serviceProvider)
{ {
var batchSize = 1000; var batchSize = 1000;
var fetchSize = 0; var fetchSize = 0;
var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
for (var i = 0; i < 1000; i++) for (var i = 0; i < 1000; i++)
{ {
var balanceList = new List<Tuple<VmiBalance, int>>();
var vmiReplenishedList = new List<VmiReplenished>();
var sw = new Stopwatch(); var sw = new Stopwatch();
sw.Start(); sw.Start();
using var connection = new SqlConnection(connectionString); using var connection = new Microsoft.Data.SqlClient.SqlConnection(connectionString);
connection.Open(); connection.Open();
using var transaction = connection.BeginTransaction(); using var transaction = connection.BeginTransaction();
try try
@ -72,20 +89,30 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
var vmiLogRepo = context.Set<VmiLog>(); var vmiLogRepo = context.Set<VmiLog>();
var vmiBalanceRepo = context.Set<VmiBalance>(); var vmiBalanceRepo = context.Set<VmiBalance>();
var vmiReplenishedRepo = context.Set<VmiReplenished>(); var vmiReplenishedRepo = context.Set<VmiReplenished>();
if (!vmiMessageRepo.Any(o => !o.isConsumed)) //读取可消费消息列表
var messages = vmiMessageRepo.AsNoTracking().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(batchSize).ToList();
//没有可消费消息则返回
if (!messages.Any())
{ {
transaction.Commit(); transaction.Commit();
break; break;
} }
var messages = vmiMessageRepo.Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(batchSize).ToList(); //设置数量为实际返回数量
fetchSize=messages.Count; fetchSize = messages.Count;
//反序列化获取库存事务
var list = messages.Select(o => var list = messages.Select(o =>
{ {
var log = JsonSerializer.Deserialize<VmiLog>(o.Message); var log = JsonSerializer.Deserialize<VmiLog>(o.Message);
log.SetId(Guid.Parse(JsonSerializer.Deserialize<JsonElement>(o.Message).GetProperty("Id").GetString())); var jsonElement = JsonSerializer.Deserialize<JsonElement>(o.Message);
return new KeyValuePair<VmiMessage, VmiLog>(o, log); var property = jsonElement.GetProperty("Id");
}).ToDictionary(o => o.Key, o => o.Value); Console.WriteLine(jsonElement);
var tables = list.Values.Select(o => $"Set_VmiLog_{o.ChangedTime.Year}_{(o.ChangedTime.Month - 1) / 3 + 1}").Distinct().ToList(); var id = Guid.Parse(property.GetString());
log.SetId(id);
return new LogToBalance { VmiMessage = o, Log = log, Table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}" };
});
//获取分表名称
var tables = list.Select(o => o.Table).Distinct().ToList();
//创建分表
foreach (var table in tables) foreach (var table in tables)
{ {
command.CommandText = $"select OBJECT_ID('{table}', 'U')"; command.CommandText = $"select OBJECT_ID('{table}', 'U')";
@ -100,17 +127,26 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
command.ExecuteNonQuery(); command.ExecuteNonQuery();
} }
} }
foreach (var keyValue in list) foreach (var item in list)
{ {
var message = keyValue.Key; var message = item.VmiMessage;
var log = keyValue.Value; message.isConsumed = true;
//获取分表名 var log = item.Log;
var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}"; //本地查找
//插入到分表 var balance = balanceList.Select(o => o.Item1).FirstOrDefault(
command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'"; o => o.DeliverBillType == log.DeliverBillType &&
command.ExecuteNonQuery(); o.CodeType == log.CodeType &&
//插入库存 o.DeliverBillType == log.DeliverBillType &&
var balance = vmiBalanceRepo.FirstOrDefault( o.VinCode == log.VinCode &&
o.ErpToLoc == log.ErpToLoc &&
o.OrderNum == log.OrderNum &&
o.factory == log.factory &&
o.Configcode == log.Configcode);
//数据库查找
if (balance == null)
{
balance = vmiBalanceRepo.FirstOrDefault(
o => o.DeliverBillType == log.DeliverBillType && o => o.DeliverBillType == log.DeliverBillType &&
o.CodeType == log.CodeType && o.CodeType == log.CodeType &&
o.DeliverBillType == log.DeliverBillType && o.DeliverBillType == log.DeliverBillType &&
@ -119,10 +155,12 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
o.OrderNum == log.OrderNum && o.OrderNum == log.OrderNum &&
o.factory == log.factory && o.factory == log.factory &&
o.Configcode == log.Configcode); o.Configcode == log.Configcode);
}
if (balance == null) if (balance == null)
{//不存在库存记录 {//不存在库存记录
//新建库存记录 //新建库存记录
balance = new VmiBalance(); balance = new VmiBalance();
balanceList.Add(new Tuple<VmiBalance, int>(balance, 1));
if (log.LogType == VmiLogType.Type300) if (log.LogType == VmiLogType.Type300)
{//反结算入库,重建库存 {//反结算入库,重建库存
var logHistory = vmiLogRepo.AsNoTracking().FirstOrDefault( var logHistory = vmiLogRepo.AsNoTracking().FirstOrDefault(
@ -149,10 +187,13 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
balance.InjectFrom(log); balance.InjectFrom(log);
} }
balance.Qty = log.ChangedQty; balance.Qty = log.ChangedQty;
await vmiBalanceRepo.AddAsync(balance).ConfigureAwait(false);
} }
else else
{//存在库存记录 {//存在库存记录
if (!balanceList.Any(o => o.Item1.Id == balance.Id))
{
balanceList.Add(new Tuple<VmiBalance, int>(balance, 2));
}
var logType = log.LogType; var logType = log.LogType;
var currentQty = balance.Qty;// + log.ty var currentQty = balance.Qty;// + log.ty
if (logType == VmiLogType.Type100) if (logType == VmiLogType.Type100)
@ -177,7 +218,7 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
//添加负库存补货记录 //添加负库存补货记录
var log2 = new VmiReplenished(); var log2 = new VmiReplenished();
log2.InjectFrom(log); log2.InjectFrom(log);
await vmiReplenishedRepo.AddAsync(log2).ConfigureAwait(false); vmiReplenishedList.Add(log2);
} }
// 更新库存 // 更新库存
balance.Qty = currentQty + log.ChangedQty; balance.Qty = currentQty + log.ChangedQty;
@ -187,9 +228,40 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
vmiBalanceRepo.Remove(balance); vmiBalanceRepo.Remove(balance);
} }
} }
message.isConsumed = true;
context.SaveChanges();
} }
//更新事务分表
//foreach (var item in tables)
//{
// var logs = list.Where(o => o.Table == item).Select(o => o.Log).ToList();
// var xxx= logs.Select(o=>o.Id).Distinct().ToList();
// await context.BulkInsertAsync(logs, new BulkConfig
// {
// CustomDestinationTableName = item,
// }).ConfigureAwait(false);
//}
//批量更新消息
await context.BulkUpdateAsync(messages, new BulkConfig
{
PropertiesToExclude = new List<string> { nameof(VmiMessage.Number) }
}).ConfigureAwait(false);
//批量插入负库存补货记录
await context.BulkInsertAsync(vmiReplenishedList).ConfigureAwait(false);
//批量插入库存余额
var addList = balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList();
var test0 = addList.Count;
var test1 = addList.Select(o => o.Id).Distinct().Count();
await context.BulkInsertAsync(balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false);
//批量更新库存余额
var updateList = balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList();
var test20 = updateList.Count;
var test21 = updateList.Select(o => o.Id).Distinct().Count();
await context.BulkUpdateAsync(balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false);
//批量删除库存余额
var deleteList = balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty == decimal.Zero).Select(o => o.Item1).ToList();
var test30 = addList.Count;
var test31 = addList.Select(o => o.Id).Distinct().Count();
await context.BulkDeleteAsync(balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty == decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false);
transaction.Commit(); transaction.Commit();
} }
catch (Exception ex) catch (Exception ex)
@ -205,3 +277,10 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi
} }
} }
} }
public class LogToBalance
{
public VmiMessage VmiMessage { get; set; }
public VmiLog Log { get; set; }
public string Table { get; set; }
}

3
code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiBalance.cs

@ -1,4 +1,5 @@
using System; using System;
using SequentialGuid;
namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi; namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi;
@ -6,7 +7,7 @@ public class VmiBalance : VmiBalanceBase
{ {
public VmiBalance() public VmiBalance()
{ {
Id = SequentialGuid.SequentialSqlGuidGenerator.Instance.NewGuid(); Id = SequentialGuidGenerator.Instance.NewGuid();
CreatedTime = Id.ToDateTime().Value.ToLocalTime(); CreatedTime = Id.ToDateTime().Value.ToLocalTime();
ConcurrencyStamp = Guid.NewGuid().ToString("N"); ConcurrencyStamp = Guid.NewGuid().ToString("N");
} }

3
code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiLog.cs

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using SequentialGuid;
namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi; namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi;
@ -11,7 +12,7 @@ public class VmiLog : VmiBalanceBase, IValidatableObject
{ {
public VmiLog() public VmiLog()
{ {
Id = SequentialGuid.SequentialSqlGuidGenerator.Instance.NewGuid(); Id = SequentialGuidGenerator.Instance.NewGuid();
ChangedTime = Id.ToDateTime().Value.ToLocalTime(); ChangedTime = Id.ToDateTime().Value.ToLocalTime();
ConcurrencyStamp = Guid.NewGuid().ToString("N"); ConcurrencyStamp = Guid.NewGuid().ToString("N");
} }

3
code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiMessage.cs

@ -1,4 +1,5 @@
using System; using System;
using SequentialGuid;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi; namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi;
@ -7,7 +8,7 @@ public class VmiMessage : Entity<Guid>
{ {
public VmiMessage() public VmiMessage()
{ {
Id = SequentialGuid.SequentialSqlGuidGenerator.Instance.NewGuid(); Id = SequentialGuidGenerator.Instance.NewGuid();
CreatedTime = Id.ToDateTime().Value.ToLocalTime(); CreatedTime = Id.ToDateTime().Value.ToLocalTime();
ConcurrencyStamp = Guid.NewGuid().ToString("N"); ConcurrencyStamp = Guid.NewGuid().ToString("N");
} }

Loading…
Cancel
Save