Browse Source

添加临时库存初始化导入;优化人工调整导入性能

master
wanggang 1 year ago
parent
commit
cea79e92f9
  1. 190
      code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs
  2. 11
      code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs

190
code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAppService.cs

@ -24,6 +24,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Omu.ValueInjecter;
using RestSharp.Extensions; using RestSharp.Extensions;
using SettleAccount.Job.SignalR; using SettleAccount.Job.SignalR;
using SqlSugar; using SqlSugar;
@ -489,6 +490,195 @@ namespace Win.Sfs.SettleAccount.Entities.BQ
} }
} }
/// <summary>
/// 库存初始化导入,临时使用
/// </summary>
/// <param name="files"></param>
[HttpPost]
public async Task<IActionResult> ImportForInit(List<IFormFile> files)
{
try
{
using var ms = new MemoryStream();
var file = files.FirstOrDefault();
await file.OpenReadStream().CopyToAsync(ms).ConfigureAwait(false);
var data = ms.ToArray();
var tupleList = this.ImportInternal<VmiLog>(data);
if (tupleList.Any(o => o.Item2.Count > 0))
{
using var workbook = new XLWorkbook(new MemoryStream(data));
var ws = workbook.Worksheets.FirstOrDefault();
var header = ws.Row(1);
var errorIndex = ws.ColumnsUsed().Count() + 1;
header.Cell(errorIndex).Value = "提示信息";
for (int i = 0; i < ws.RowsUsed().Count() - 1; i++)
{
ws.Row(i + 2).Cell(errorIndex).Value = string.Join(',', tupleList[i].Item2.Select(o => o.ErrorMessage));
}
SetStyle(ws);
using var stream = new MemoryStream();
workbook.SaveAs(stream);
stream.Seek(0, SeekOrigin.Begin);
var fileName = $"{file.Name}_错误信息.xlsx";
await this._fileContainer.SaveAsync(fileName, stream, true).ConfigureAwait(false);
return new JsonResult(new { code = 400, message = "输入异常", fileName });
}
var logList = tupleList.Select(o => o.Item1).ToList();
logList.AsParallel().ForEach(Update);
var messageList = logList.Select(log => new VmiMessage { Message = JsonSerializer.Serialize(log) }).ToList();
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connectionString).Options;
using var context = new SettleAccountDbContext(options);
var st = new Stopwatch();
st.Start();
try
{
//导入日志和消息
using var transaction = context.Database.BeginTransaction();
await context.BulkInsertAsync(logList).ConfigureAwait(false);
await context.BulkInsertAsync(messageList).ConfigureAwait(false);
transaction.Commit();
//手动消费入库
await this.InvokeInternal(this._serviceProvider);
}
catch (Exception ex)
{
_logger.LogError(ex.ToString());
return new JsonResult(new { code = 500, message = ex.ToString() });
}
finally
{
st.Stop();
this._logger.LogInformation($"事务结束,耗时 ${st.ElapsedMilliseconds / 1000 / 60}分钟");
}
return new JsonResult(new { code = 200, message = "ok" });
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
return new JsonResult(new { code = 500, data = ex.ToString(), message = ex.Message }); ;
}
}
private async Task InvokeInternal(IServiceProvider serviceProvider)
{
var batchSize = 10000;
var fetchSize = 0;
var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
while(true)
{
var balanceList = new List<Tuple<VmiBalance, int>>();
var sw = new Stopwatch();
sw.Start();
using var connection = new Microsoft.Data.SqlClient.SqlConnection(connectionString);
connection.Open();
using var transaction = connection.BeginTransaction();
try
{
var command = connection.CreateCommand();
command.Transaction = transaction;
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options;
using var context = new SettleAccountDbContext(options);
context.Database.UseTransaction(transaction);
var vmiMessageRepo = context.Set<VmiMessage>();
var vmiLogRepo = context.Set<VmiLog>();
//读取可消费消息列表
var messages = vmiMessageRepo.AsNoTracking().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(batchSize).ToList();
//没有可消费消息则返回
if (!messages.Any())
{
transaction.Commit();
break;
}
//设置数量为实际返回数量
fetchSize = messages.Count;
//反序列化获取库存事务
var list = messages.Select(o =>
{
var log = JsonSerializer.Deserialize<VmiLog>(o.Message);
var jsonElement = JsonSerializer.Deserialize<JsonElement>(o.Message);
var property = jsonElement.GetProperty("Id");
Console.WriteLine(jsonElement);
var id = Guid.Parse(property.GetString());
log.SetId(id);
return new LogToBalance { VmiMessage = o, Log = log, Table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}" };
});
//获取分表名称
var tables = list.Select(o => o.Table).Distinct().ToList();
//创建分表
foreach (var table in tables)
{
command.CommandText = $"select OBJECT_ID('{table}', 'U')";
var result = command.ExecuteScalar().ToString();
if (result == string.Empty)
{
command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;";
command.ExecuteNonQuery();
command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);";
command.ExecuteNonQuery();
}
}
foreach (var item in list)
{
var message = item.VmiMessage;
//message.isConsumed = true;
var log = item.Log;
var balance = new VmiBalance();
balanceList.Add(new Tuple<VmiBalance, int>(balance, 1));
if (log.LogType == VmiLogType.Type300)
{//反结算入库,重建库存
var logHistory = vmiLogRepo.AsNoTracking().FirstOrDefault(
o => o.LogType == VmiLogType.Type100 &&
o.DeliverBillType == log.DeliverBillType &&
o.CodeType == log.CodeType &&
o.RealPartCode == log.RealPartCode &&
o.VinCode == log.VinCode &&
o.ErpToLoc == log.ErpToLoc &&
o.OrderNum == log.OrderNum &&
o.factory == log.factory &&
o.Configcode == log.Configcode);
if (logHistory != null)
{
balance.InjectFrom(logHistory);
}
else
{
balance.InjectFrom(log);
}
}
else
{
balance.InjectFrom(log);
}
balance.Qty = log.ChangedQty;
}
//更新事务分表
foreach (var item in tables)
{
var logs = list.Where(o => o.Table == item).Select(o => o.Log).ToList();
await context.BulkCopyAsync(new LinqToDB.Data.BulkCopyOptions { TableName = item }, logs).ConfigureAwait(false);
}
//批量更新消息
await context.BulkDeleteAsync(messages).ConfigureAwait(false);
//批量插入库存余额
await context.BulkInsertAsync(balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false);
transaction.Commit();
}
catch (Exception ex)
{
this._logger.LogError(ex.ToString());
throw new UserFriendlyException(ex.ToString(), "500");
}
finally
{
sw.Stop();
this._logger.LogInformation($"处理{fetchSize}条,耗时 {sw.ElapsedMilliseconds / 1000 / 60}分钟,{sw.ElapsedMilliseconds / 1000}秒");
}
}
}
private List<Tuple<T, List<ValidationResult>>> ImportInternal<T>(byte[] data) private List<Tuple<T, List<ValidationResult>>> ImportInternal<T>(byte[] data)
{ {
var list = new List<Tuple<T, List<ValidationResult>>>(); var list = new List<Tuple<T, List<ValidationResult>>>();

11
code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs

@ -1,10 +1,12 @@
using System; using System;
using System.Data.SqlClient;
using System.Linq; using System.Linq;
using System.Linq.Dynamic.Core; using System.Linq.Dynamic.Core;
using System.Threading.Tasks; using System.Threading.Tasks;
using EFCore.BulkExtensions; using EFCore.BulkExtensions;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using SettleAccount.Job.SignalR; using SettleAccount.Job.SignalR;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
@ -29,9 +31,12 @@ namespace Win.Sfs.SettleAccount.Entities.BQ
public Task Invoke(IServiceProvider serviceProvider) public Task Invoke(IServiceProvider serviceProvider)
{ {
using var scope = serviceProvider.CreateScope(); using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>(); var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
db.Set<VmiMessage>().Where(o => o.isConsumed).BatchDelete(); using var connection = new SqlConnection(connectionString);
var count = db.Set<VmiMessage>().Where(o => !o.isConsumed).Count(); connection.Open();
var command = connection.CreateCommand();
command.CommandText = "SELECT count(*) FROM Set_VmiMessage WITH(NOLOCK) where isConsumed = 0;";
var count = Convert.ToInt64(command.ExecuteScalar().ToString());
scope.ServiceProvider.GetService<IHubContext<PageHub>>().Clients.All.ServerToClient("VmiBalance", count.ToString(), ""); scope.ServiceProvider.GetService<IHubContext<PageHub>>().Clients.All.ServerToClient("VmiBalance", count.ToString(), "");
return Task.CompletedTask; return Task.CompletedTask;
} }

Loading…
Cancel
Save