From e81338d2643f4005e01c769ff78de90b5a019de1 Mon Sep 17 00:00:00 2001 From: wanggang <76527413@qq.com> Date: Sat, 2 Sep 2023 13:59:06 +0800 Subject: [PATCH] update --- .../Entities/BQ/VmiAppService.cs | 30 ++--- .../Entities/BQ/VmiAsyncBalanceService.cs | 127 ++++++++++++++---- .../Entities/BQ/Vmi/VmiBalance.cs | 3 +- .../Entities/BQ/Vmi/VmiLog.cs | 3 +- .../Entities/BQ/Vmi/VmiMessage.cs | 3 +- 5 files changed, 124 insertions(+), 42 deletions(-) diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs index eed95dee..e5790713 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs @@ -89,7 +89,7 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran { using var ms = new MemoryStream(); await files.FirstOrDefault().OpenReadStream().CopyToAsync(ms).ConfigureAwait(false); - return this.ImportInternal(ms.ToArray(), out var validationResults); + return this.ImportInternal(ms.ToArray()).Select(o => o.Item1).ToList(); } /// @@ -388,8 +388,8 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran var file = files.FirstOrDefault(); await file.OpenReadStream().CopyToAsync(ms).ConfigureAwait(false); var data = ms.ToArray(); - var list = this.ImportInternal(data, out var validationResults); - if (validationResults.Any(o => o.Count > 0)) + var tupleList = this.ImportInternal(data).Where(o => o.Item2.Count == 0).ToList(); + if (tupleList.Any(o => o.Item2.Count > 0)) { using var workbook = new XLWorkbook(new MemoryStream(data)); var ws = workbook.Worksheets.FirstOrDefault(); @@ -398,7 +398,7 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran header.Cell(errorIndex).Value = "提示信息"; for (int i = 0; i < ws.RowsUsed().Count() - 1; i++) { - ws.Row(i + 2).Cell(errorIndex).Value = string.Join(',', validationResults[i].Select(o => o.ErrorMessage)); + ws.Row(i + 2).Cell(errorIndex).Value = string.Join(',', tupleList[i].Item2.Select(o => o.ErrorMessage)); } SetStyle(ws); using var stream = new MemoryStream(); @@ -408,8 +408,9 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran await this._fileContainer.SaveAsync(fileName, stream, true).ConfigureAwait(false); return new JsonResult(new { code = 400, message = "输入异常", fileName }); } - list.ForEach(Update); - var messageList = list.Select(log => new VmiMessage { Message = JsonSerializer.Serialize(log) }).ToList(); + var logList = tupleList.Select(o => o.Item1).ToList(); + logList.AsParallel().ForEach(Update); + var messageList = logList.Select(log => new VmiMessage { Message = JsonSerializer.Serialize(log) }).ToList(); var connectionString = this._serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); var options = new DbContextOptionsBuilder().UseSqlServer(connectionString).Options; using var context = new SettleAccountDbContext(options); @@ -418,7 +419,7 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran try { using var transaction = context.Database.BeginTransaction(); - await context.BulkInsertAsync(list).ConfigureAwait(false); + await context.BulkInsertAsync(logList).ConfigureAwait(false); await context.BulkInsertAsync(messageList).ConfigureAwait(false); transaction.Commit(); } @@ -441,10 +442,9 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran } } - private List ImportInternal(byte[] data, out List> validationResults) + private List>> ImportInternal(byte[] data) { - var list = new List(); - validationResults = new List>(); + var list = new List>>(); try { using var workbook = new XLWorkbook(new MemoryStream(data)); @@ -452,12 +452,13 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran .Where(o => o.GetAttributes().Any() || o.GetAttributes().Any()) .ToDictionary(o => o.GetAttribute()?.Name ?? o.GetAttribute()?.Name, o => o); var ws = workbook.Worksheets.FirstOrDefault(); - - for (int rowIndex = 2; rowIndex <= ws.RowsUsed().Count(); rowIndex++) + var rowsUsedCount = ws.RowsUsed().Count(); + var columnsUsedCount = ws.ColumnsUsed().Count(); + for (int rowIndex = 2; rowIndex <= rowsUsedCount; rowIndex++) { var row = ws.Row(rowIndex); var model = Activator.CreateInstance(); - for (var columnIndex = 1; columnIndex < ws.ColumnsUsed().Count(); columnIndex++) + for (var columnIndex = 1; columnIndex < columnsUsedCount; columnIndex++) { var cell = row.Cell(columnIndex); var headerName = ws.Cell(1, columnIndex).Value.ToString().Trim(); @@ -498,10 +499,9 @@ public class VmiAppService : Controller, IApplicationService, IJobService, ITran } } } - list.Add(model); var results = new List(); Validator.TryValidateObject(model, new ValidationContext(model, null, null), results); - validationResults.Add(results); + list.Add(new Tuple>(model, results)); } return list; } diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs index 0ef9e04f..e348cb00 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs @@ -1,11 +1,12 @@ using System; using System.Collections.Generic; -using System.Data.SqlClient; using System.Diagnostics; using System.Linq; using System.Linq.Dynamic.Core; using System.Text.Json; +using System.Threading; using System.Threading.Tasks; +using EFCore.BulkExtensions; using Magicodes.ExporterAndImporter.Core.Extension; using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; @@ -29,6 +30,7 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi { private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; + private static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1); public VmiAsyncBalanceService(IServiceProvider serviceProvider, ILogger logger) { @@ -50,15 +52,30 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi [NonAction] public async Task Invoke(IServiceProvider serviceProvider) + { + await semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + await InvokeInternal(serviceProvider).ConfigureAwait(false); + } + finally + { + semaphoreSlim.Release(); + } + } + + private async Task InvokeInternal(IServiceProvider serviceProvider) { var batchSize = 1000; var fetchSize = 0; var connectionString = serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); for (var i = 0; i < 1000; i++) { + var balanceList = new List>(); + var vmiReplenishedList = new List(); var sw = new Stopwatch(); sw.Start(); - using var connection = new SqlConnection(connectionString); + using var connection = new Microsoft.Data.SqlClient.SqlConnection(connectionString); connection.Open(); using var transaction = connection.BeginTransaction(); try @@ -72,20 +89,30 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi var vmiLogRepo = context.Set(); var vmiBalanceRepo = context.Set(); var vmiReplenishedRepo = context.Set(); - if (!vmiMessageRepo.Any(o => !o.isConsumed)) + //读取可消费消息列表 + var messages = vmiMessageRepo.AsNoTracking().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(batchSize).ToList(); + //没有可消费消息则返回 + if (!messages.Any()) { transaction.Commit(); break; } - var messages = vmiMessageRepo.Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(batchSize).ToList(); - fetchSize=messages.Count; + //设置数量为实际返回数量 + fetchSize = messages.Count; + //反序列化获取库存事务 var list = messages.Select(o => { var log = JsonSerializer.Deserialize(o.Message); - log.SetId(Guid.Parse(JsonSerializer.Deserialize(o.Message).GetProperty("Id").GetString())); - return new KeyValuePair(o, log); - }).ToDictionary(o => o.Key, o => o.Value); - var tables = list.Values.Select(o => $"Set_VmiLog_{o.ChangedTime.Year}_{(o.ChangedTime.Month - 1) / 3 + 1}").Distinct().ToList(); + var jsonElement = JsonSerializer.Deserialize(o.Message); + var property = jsonElement.GetProperty("Id"); + Console.WriteLine(jsonElement); + var id = Guid.Parse(property.GetString()); + log.SetId(id); + return new LogToBalance { VmiMessage = o, Log = log, Table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}" }; + }); + //获取分表名称 + var tables = list.Select(o => o.Table).Distinct().ToList(); + //创建分表 foreach (var table in tables) { command.CommandText = $"select OBJECT_ID('{table}', 'U')"; @@ -100,17 +127,26 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi command.ExecuteNonQuery(); } } - foreach (var keyValue in list) + foreach (var item in list) { - var message = keyValue.Key; - var log = keyValue.Value; - //获取分表名 - var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}"; - //插入到分表 - command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'"; - command.ExecuteNonQuery(); - //插入库存 - var balance = vmiBalanceRepo.FirstOrDefault( + var message = item.VmiMessage; + message.isConsumed = true; + var log = item.Log; + //本地查找 + var balance = balanceList.Select(o => o.Item1).FirstOrDefault( + o => o.DeliverBillType == log.DeliverBillType && + o.CodeType == log.CodeType && + o.DeliverBillType == log.DeliverBillType && + o.VinCode == log.VinCode && + o.ErpToLoc == log.ErpToLoc && + o.OrderNum == log.OrderNum && + o.factory == log.factory && + o.Configcode == log.Configcode); + + //数据库查找 + if (balance == null) + { + balance = vmiBalanceRepo.FirstOrDefault( o => o.DeliverBillType == log.DeliverBillType && o.CodeType == log.CodeType && o.DeliverBillType == log.DeliverBillType && @@ -119,10 +155,12 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi o.OrderNum == log.OrderNum && o.factory == log.factory && o.Configcode == log.Configcode); + } if (balance == null) {//不存在库存记录 - //新建库存记录 + //新建库存记录 balance = new VmiBalance(); + balanceList.Add(new Tuple(balance, 1)); if (log.LogType == VmiLogType.Type300) {//反结算入库,重建库存 var logHistory = vmiLogRepo.AsNoTracking().FirstOrDefault( @@ -149,10 +187,13 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi balance.InjectFrom(log); } balance.Qty = log.ChangedQty; - await vmiBalanceRepo.AddAsync(balance).ConfigureAwait(false); } else {//存在库存记录 + if (!balanceList.Any(o => o.Item1.Id == balance.Id)) + { + balanceList.Add(new Tuple(balance, 2)); + } var logType = log.LogType; var currentQty = balance.Qty;// + log.ty if (logType == VmiLogType.Type100) @@ -177,7 +218,7 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi //添加负库存补货记录 var log2 = new VmiReplenished(); log2.InjectFrom(log); - await vmiReplenishedRepo.AddAsync(log2).ConfigureAwait(false); + vmiReplenishedList.Add(log2); } // 更新库存 balance.Qty = currentQty + log.ChangedQty; @@ -187,9 +228,40 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi vmiBalanceRepo.Remove(balance); } } - message.isConsumed = true; - context.SaveChanges(); } + + //更新事务分表 + //foreach (var item in tables) + //{ + // var logs = list.Where(o => o.Table == item).Select(o => o.Log).ToList(); + // var xxx= logs.Select(o=>o.Id).Distinct().ToList(); + // await context.BulkInsertAsync(logs, new BulkConfig + // { + // CustomDestinationTableName = item, + // }).ConfigureAwait(false); + //} + //批量更新消息 + await context.BulkUpdateAsync(messages, new BulkConfig + { + PropertiesToExclude = new List { nameof(VmiMessage.Number) } + }).ConfigureAwait(false); + //批量插入负库存补货记录 + await context.BulkInsertAsync(vmiReplenishedList).ConfigureAwait(false); + //批量插入库存余额 + var addList = balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList(); + var test0 = addList.Count; + var test1 = addList.Select(o => o.Id).Distinct().Count(); + await context.BulkInsertAsync(balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false); + //批量更新库存余额 + var updateList = balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList(); + var test20 = updateList.Count; + var test21 = updateList.Select(o => o.Id).Distinct().Count(); + await context.BulkUpdateAsync(balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false); + //批量删除库存余额 + var deleteList = balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty == decimal.Zero).Select(o => o.Item1).ToList(); + var test30 = addList.Count; + var test31 = addList.Select(o => o.Id).Distinct().Count(); + await context.BulkDeleteAsync(balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty == decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false); transaction.Commit(); } catch (Exception ex) @@ -205,3 +277,10 @@ public class VmiAsyncBalanceService : Controller, IApplicationService, IJobServi } } } + +public class LogToBalance +{ + public VmiMessage VmiMessage { get; set; } + public VmiLog Log { get; set; } + public string Table { get; set; } +} diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiBalance.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiBalance.cs index be8a6c7c..1402c790 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiBalance.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiBalance.cs @@ -1,4 +1,5 @@ using System; +using SequentialGuid; namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi; @@ -6,7 +7,7 @@ public class VmiBalance : VmiBalanceBase { public VmiBalance() { - Id = SequentialGuid.SequentialSqlGuidGenerator.Instance.NewGuid(); + Id = SequentialGuidGenerator.Instance.NewGuid(); CreatedTime = Id.ToDateTime().Value.ToLocalTime(); ConcurrencyStamp = Guid.NewGuid().ToString("N"); } diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiLog.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiLog.cs index c5950b70..174bc5d9 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiLog.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiLog.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; +using SequentialGuid; namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi; @@ -11,7 +12,7 @@ public class VmiLog : VmiBalanceBase, IValidatableObject { public VmiLog() { - Id = SequentialGuid.SequentialSqlGuidGenerator.Instance.NewGuid(); + Id = SequentialGuidGenerator.Instance.NewGuid(); ChangedTime = Id.ToDateTime().Value.ToLocalTime(); ConcurrencyStamp = Guid.NewGuid().ToString("N"); } diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiMessage.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiMessage.cs index ee10db06..7543ca33 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiMessage.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/VmiMessage.cs @@ -1,4 +1,5 @@ using System; +using SequentialGuid; using Volo.Abp.Domain.Entities; namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi; @@ -7,7 +8,7 @@ public class VmiMessage : Entity { public VmiMessage() { - Id = SequentialGuid.SequentialSqlGuidGenerator.Instance.NewGuid(); + Id = SequentialGuidGenerator.Instance.NewGuid(); CreatedTime = Id.ToDateTime().Value.ToLocalTime(); ConcurrencyStamp = Guid.NewGuid().ToString("N"); }