diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs index ac5a1577..4321d181 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs @@ -24,6 +24,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Omu.ValueInjecter; using RestSharp.Extensions; using SettleAccount.Job.SignalR; using SqlSugar; @@ -489,6 +490,195 @@ namespace Win.Sfs.SettleAccount.Entities.BQ } } + /// + /// 库存初始化导入,临时使用 + /// + /// + [HttpPost] + public async Task ImportForInit(List files) + { + try + { + using var ms = new MemoryStream(); + var file = files.FirstOrDefault(); + await file.OpenReadStream().CopyToAsync(ms).ConfigureAwait(false); + var data = ms.ToArray(); + var tupleList = this.ImportInternal(data); + if (tupleList.Any(o => o.Item2.Count > 0)) + { + using var workbook = new XLWorkbook(new MemoryStream(data)); + var ws = workbook.Worksheets.FirstOrDefault(); + var header = ws.Row(1); + var errorIndex = ws.ColumnsUsed().Count() + 1; + header.Cell(errorIndex).Value = "提示信息"; + for (int i = 0; i < ws.RowsUsed().Count() - 1; i++) + { + ws.Row(i + 2).Cell(errorIndex).Value = string.Join(',', tupleList[i].Item2.Select(o => o.ErrorMessage)); + } + SetStyle(ws); + using var stream = new MemoryStream(); + workbook.SaveAs(stream); + stream.Seek(0, SeekOrigin.Begin); + var fileName = $"{file.Name}_错误信息.xlsx"; + await this._fileContainer.SaveAsync(fileName, stream, true).ConfigureAwait(false); + return new JsonResult(new { code = 400, message = "输入异常", fileName }); + } + var logList = tupleList.Select(o => o.Item1).ToList(); + logList.AsParallel().ForEach(Update); + var messageList = logList.Select(log => new VmiMessage { Message = JsonSerializer.Serialize(log) }).ToList(); + var connectionString = this._serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); + var options = new DbContextOptionsBuilder().UseSqlServer(connectionString).Options; + using var context = new SettleAccountDbContext(options); + var st = new Stopwatch(); + st.Start(); + try + { + //导入日志和消息 + using var transaction = context.Database.BeginTransaction(); + await context.BulkInsertAsync(logList).ConfigureAwait(false); + await context.BulkInsertAsync(messageList).ConfigureAwait(false); + transaction.Commit(); + //手动消费入库 + await this.InvokeInternal(this._serviceProvider); + } + catch (Exception ex) + { + _logger.LogError(ex.ToString()); + return new JsonResult(new { code = 500, message = ex.ToString() }); + } + finally + { + st.Stop(); + this._logger.LogInformation($"事务结束,耗时 ${st.ElapsedMilliseconds / 1000 / 60}分钟"); + } + return new JsonResult(new { code = 200, message = "ok" }); + } + catch (Exception ex) + { + this._logger.LogError(ex.ToString()); + return new JsonResult(new { code = 500, data = ex.ToString(), message = ex.Message }); ; + } + } + + private async Task InvokeInternal(IServiceProvider serviceProvider) + { + var batchSize = 10000; + var fetchSize = 0; + var connectionString = serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); + while(true) + { + var balanceList = new List>(); + var sw = new Stopwatch(); + sw.Start(); + using var connection = new Microsoft.Data.SqlClient.SqlConnection(connectionString); + connection.Open(); + using var transaction = connection.BeginTransaction(); + try + { + var command = connection.CreateCommand(); + command.Transaction = transaction; + var options = new DbContextOptionsBuilder().UseSqlServer(connection).Options; + using var context = new SettleAccountDbContext(options); + context.Database.UseTransaction(transaction); + var vmiMessageRepo = context.Set(); + var vmiLogRepo = context.Set(); + //读取可消费消息列表 + var messages = vmiMessageRepo.AsNoTracking().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(batchSize).ToList(); + //没有可消费消息则返回 + if (!messages.Any()) + { + transaction.Commit(); + break; + } + //设置数量为实际返回数量 + fetchSize = messages.Count; + //反序列化获取库存事务 + var list = messages.Select(o => + { + var log = JsonSerializer.Deserialize(o.Message); + var jsonElement = JsonSerializer.Deserialize(o.Message); + var property = jsonElement.GetProperty("Id"); + Console.WriteLine(jsonElement); + var id = Guid.Parse(property.GetString()); + log.SetId(id); + return new LogToBalance { VmiMessage = o, Log = log, Table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}" }; + }); + //获取分表名称 + var tables = list.Select(o => o.Table).Distinct().ToList(); + //创建分表 + foreach (var table in tables) + { + command.CommandText = $"select OBJECT_ID('{table}', 'U')"; + var result = command.ExecuteScalar().ToString(); + if (result == string.Empty) + { + command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;"; + command.ExecuteNonQuery(); + command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);"; + command.ExecuteNonQuery(); + } + } + foreach (var item in list) + { + var message = item.VmiMessage; + //message.isConsumed = true; + var log = item.Log; + var balance = new VmiBalance(); + balanceList.Add(new Tuple(balance, 1)); + if (log.LogType == VmiLogType.Type300) + {//反结算入库,重建库存 + var logHistory = vmiLogRepo.AsNoTracking().FirstOrDefault( + o => o.LogType == VmiLogType.Type100 && + o.DeliverBillType == log.DeliverBillType && + o.CodeType == log.CodeType && + o.RealPartCode == log.RealPartCode && + o.VinCode == log.VinCode && + o.ErpToLoc == log.ErpToLoc && + o.OrderNum == log.OrderNum && + o.factory == log.factory && + o.Configcode == log.Configcode); + if (logHistory != null) + { + balance.InjectFrom(logHistory); + } + else + { + balance.InjectFrom(log); + } + } + else + { + balance.InjectFrom(log); + } + balance.Qty = log.ChangedQty; + } + + //更新事务分表 + foreach (var item in tables) + { + var logs = list.Where(o => o.Table == item).Select(o => o.Log).ToList(); + await context.BulkCopyAsync(new LinqToDB.Data.BulkCopyOptions { TableName = item }, logs).ConfigureAwait(false); + } + //批量更新消息 + await context.BulkDeleteAsync(messages).ConfigureAwait(false); + //批量插入库存余额 + await context.BulkInsertAsync(balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false); + transaction.Commit(); + } + catch (Exception ex) + { + this._logger.LogError(ex.ToString()); + throw new UserFriendlyException(ex.ToString(), "500"); + } + finally + { + sw.Stop(); + this._logger.LogInformation($"处理{fetchSize}条,耗时 {sw.ElapsedMilliseconds / 1000 / 60}分钟,{sw.ElapsedMilliseconds / 1000}秒"); + } + } + } + + private List>> ImportInternal(byte[] data) { var list = new List>>(); diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs index 127dfb40..436291b9 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs @@ -1,10 +1,12 @@ using System; +using System.Data.SqlClient; using System.Linq; using System.Linq.Dynamic.Core; using System.Threading.Tasks; using EFCore.BulkExtensions; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using SettleAccount.Job.SignalR; using Volo.Abp.Application.Services; @@ -29,9 +31,12 @@ namespace Win.Sfs.SettleAccount.Entities.BQ public Task Invoke(IServiceProvider serviceProvider) { using var scope = serviceProvider.CreateScope(); - var db = scope.ServiceProvider.GetRequiredService(); - db.Set().Where(o => o.isConsumed).BatchDelete(); - var count = db.Set().Where(o => !o.isConsumed).Count(); + var connectionString = serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); + using var connection = new SqlConnection(connectionString); + connection.Open(); + var command = connection.CreateCommand(); + command.CommandText = "SELECT count(*) FROM Set_VmiMessage WITH(NOLOCK) where isConsumed = 0;"; + var count = Convert.ToInt64(command.ExecuteScalar().ToString()); scope.ServiceProvider.GetService>().Clients.All.ServerToClient("VmiBalance", count.ToString(), ""); return Task.CompletedTask; }