You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
286 lines
13 KiB
286 lines
13 KiB
using System;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Linq.Dynamic.Core;
|
|
using System.Text.Json;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using EFCore.BulkExtensions;
|
|
using Magicodes.ExporterAndImporter.Core.Extension;
|
|
using Microsoft.AspNetCore.Mvc;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using Omu.ValueInjecter;
|
|
using Volo.Abp;
|
|
using Volo.Abp.Application.Services;
|
|
using Volo.Abp.DependencyInjection;
|
|
using Win.Sfs.SettleAccount.Entities.BQ.Vmi;
|
|
using Win.Sfs.Shared.RepositoryBase;
|
|
|
|
namespace Win.Sfs.SettleAccount.Entities.BQ;
|
|
|
|
/// <summary>
|
|
/// 异步更新库存
|
|
/// </summary>
|
|
[Route("[controller]/[action]")]
|
|
public class VmiAsyncBalanceService : Controller, IApplicationService, IJobService, ITransientDependency
|
|
{
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly ILogger<VmiAsyncBalanceService> _logger;
|
|
private static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1);
|
|
|
|
public VmiAsyncBalanceService(IServiceProvider serviceProvider, ILogger<VmiAsyncBalanceService> logger)
|
|
{
|
|
this._serviceProvider = serviceProvider;
|
|
this._logger = logger;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 异步更新库存,手动测试
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
[HttpPost]
|
|
public async Task<double> Test()
|
|
{
|
|
var now = DateTime.Now;
|
|
await Invoke(_serviceProvider).ConfigureAwait(false);
|
|
return (DateTime.Now - now).TotalMinutes;
|
|
}
|
|
|
|
[NonAction]
|
|
public async Task Invoke(IServiceProvider serviceProvider)
|
|
{
|
|
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
|
|
try
|
|
{
|
|
await InvokeInternal(serviceProvider).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
semaphoreSlim.Release();
|
|
}
|
|
}
|
|
|
|
private async Task InvokeInternal(IServiceProvider serviceProvider)
|
|
{
|
|
var batchSize = 1000;
|
|
var fetchSize = 0;
|
|
var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
|
|
for (var i = 0; i < 1000; i++)
|
|
{
|
|
var balanceList = new List<Tuple<VmiBalance, int>>();
|
|
var vmiReplenishedList = new List<VmiReplenished>();
|
|
var sw = new Stopwatch();
|
|
sw.Start();
|
|
using var connection = new Microsoft.Data.SqlClient.SqlConnection(connectionString);
|
|
connection.Open();
|
|
using var transaction = connection.BeginTransaction();
|
|
try
|
|
{
|
|
var command = connection.CreateCommand();
|
|
command.Transaction = transaction;
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options;
|
|
using var context = new SettleAccountDbContext(options);
|
|
context.Database.UseTransaction(transaction);
|
|
var vmiMessageRepo = context.Set<VmiMessage>();
|
|
var vmiLogRepo = context.Set<VmiLog>();
|
|
var vmiBalanceRepo = context.Set<VmiBalance>();
|
|
var vmiReplenishedRepo = context.Set<VmiReplenished>();
|
|
//读取可消费消息列表
|
|
var messages = vmiMessageRepo.AsNoTracking().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(batchSize).ToList();
|
|
//没有可消费消息则返回
|
|
if (!messages.Any())
|
|
{
|
|
transaction.Commit();
|
|
break;
|
|
}
|
|
//设置数量为实际返回数量
|
|
fetchSize = messages.Count;
|
|
//反序列化获取库存事务
|
|
var list = messages.Select(o =>
|
|
{
|
|
var log = JsonSerializer.Deserialize<VmiLog>(o.Message);
|
|
var jsonElement = JsonSerializer.Deserialize<JsonElement>(o.Message);
|
|
var property = jsonElement.GetProperty("Id");
|
|
Console.WriteLine(jsonElement);
|
|
var id = Guid.Parse(property.GetString());
|
|
log.SetId(id);
|
|
return new LogToBalance { VmiMessage = o, Log = log, Table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}" };
|
|
});
|
|
//获取分表名称
|
|
var tables = list.Select(o => o.Table).Distinct().ToList();
|
|
//创建分表
|
|
foreach (var table in tables)
|
|
{
|
|
command.CommandText = $"select OBJECT_ID('{table}', 'U')";
|
|
var result = command.ExecuteScalar().ToString();
|
|
if (result == string.Empty)
|
|
{
|
|
command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;";
|
|
command.ExecuteNonQuery();
|
|
command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);";
|
|
command.ExecuteNonQuery();
|
|
command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);";
|
|
command.ExecuteNonQuery();
|
|
}
|
|
}
|
|
foreach (var item in list)
|
|
{
|
|
var message = item.VmiMessage;
|
|
message.isConsumed = true;
|
|
var log = item.Log;
|
|
//本地查找
|
|
var balance = balanceList.Select(o => o.Item1).FirstOrDefault(
|
|
o => o.DeliverBillType == log.DeliverBillType &&
|
|
o.CodeType == log.CodeType &&
|
|
o.DeliverBillType == log.DeliverBillType &&
|
|
o.VinCode == log.VinCode &&
|
|
o.ErpToLoc == log.ErpToLoc &&
|
|
o.OrderNum == log.OrderNum &&
|
|
o.factory == log.factory &&
|
|
o.Configcode == log.Configcode);
|
|
|
|
//数据库查找
|
|
if (balance == null)
|
|
{
|
|
balance = vmiBalanceRepo.FirstOrDefault(
|
|
o => o.DeliverBillType == log.DeliverBillType &&
|
|
o.CodeType == log.CodeType &&
|
|
o.DeliverBillType == log.DeliverBillType &&
|
|
o.VinCode == log.VinCode &&
|
|
o.ErpToLoc == log.ErpToLoc &&
|
|
o.OrderNum == log.OrderNum &&
|
|
o.factory == log.factory &&
|
|
o.Configcode == log.Configcode);
|
|
}
|
|
if (balance == null)
|
|
{//不存在库存记录
|
|
//新建库存记录
|
|
balance = new VmiBalance();
|
|
balanceList.Add(new Tuple<VmiBalance, int>(balance, 1));
|
|
if (log.LogType == VmiLogType.Type300)
|
|
{//反结算入库,重建库存
|
|
var logHistory = vmiLogRepo.AsNoTracking().FirstOrDefault(
|
|
o => o.LogType == VmiLogType.Type100 &&
|
|
o.DeliverBillType == log.DeliverBillType &&
|
|
o.CodeType == log.CodeType &&
|
|
o.DeliverBillType == log.DeliverBillType &&
|
|
o.VinCode == log.VinCode &&
|
|
o.ErpToLoc == log.ErpToLoc &&
|
|
o.OrderNum == log.OrderNum &&
|
|
o.factory == log.factory &&
|
|
o.Configcode == log.Configcode);
|
|
if (logHistory != null)
|
|
{
|
|
balance.InjectFrom(logHistory);
|
|
}
|
|
else
|
|
{
|
|
balance.InjectFrom(log);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
balance.InjectFrom(log);
|
|
}
|
|
balance.Qty = log.ChangedQty;
|
|
}
|
|
else
|
|
{//存在库存记录
|
|
if (!balanceList.Any(o => o.Item1.Id == balance.Id))
|
|
{
|
|
balanceList.Add(new Tuple<VmiBalance, int>(balance, 2));
|
|
}
|
|
var logType = log.LogType;
|
|
var currentQty = balance.Qty;// + log.ty
|
|
if (logType == VmiLogType.Type100)
|
|
{
|
|
//发运入库,负库存字段需要更新
|
|
if (balance.Qty < decimal.Zero)
|
|
{
|
|
balance.InjectFrom(log);
|
|
}
|
|
}
|
|
else if (logType == VmiLogType.Type300)
|
|
{
|
|
//反结入库,只更新库存
|
|
}
|
|
else if (logType == VmiLogType.Type500)
|
|
{
|
|
//调整入库,更新库存和其他字段
|
|
balance.InjectFrom(log);
|
|
}
|
|
if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero)
|
|
{
|
|
//添加负库存补货记录
|
|
var log2 = new VmiReplenished();
|
|
log2.InjectFrom(log);
|
|
vmiReplenishedList.Add(log2);
|
|
}
|
|
// 更新库存
|
|
balance.Qty = currentQty + log.ChangedQty;
|
|
if (balance.Qty == decimal.Zero)
|
|
{
|
|
//删除0库存
|
|
vmiBalanceRepo.Remove(balance);
|
|
}
|
|
}
|
|
}
|
|
|
|
//更新事务分表
|
|
//foreach (var item in tables)
|
|
//{
|
|
// var logs = list.Where(o => o.Table == item).Select(o => o.Log).ToList();
|
|
// var xxx= logs.Select(o=>o.Id).Distinct().ToList();
|
|
// await context.BulkInsertAsync(logs, new BulkConfig
|
|
// {
|
|
// CustomDestinationTableName = item,
|
|
// }).ConfigureAwait(false);
|
|
//}
|
|
//批量更新消息
|
|
await context.BulkUpdateAsync(messages, new BulkConfig
|
|
{
|
|
PropertiesToExclude = new List<string> { nameof(VmiMessage.Number) }
|
|
}).ConfigureAwait(false);
|
|
//批量插入负库存补货记录
|
|
await context.BulkInsertAsync(vmiReplenishedList).ConfigureAwait(false);
|
|
//批量插入库存余额
|
|
var addList = balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList();
|
|
var test0 = addList.Count;
|
|
var test1 = addList.Select(o => o.Id).Distinct().Count();
|
|
await context.BulkInsertAsync(balanceList.Where(o => o.Item2 == 1 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false);
|
|
//批量更新库存余额
|
|
var updateList = balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList();
|
|
var test20 = updateList.Count;
|
|
var test21 = updateList.Select(o => o.Id).Distinct().Count();
|
|
await context.BulkUpdateAsync(balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty != decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false);
|
|
//批量删除库存余额
|
|
var deleteList = balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty == decimal.Zero).Select(o => o.Item1).ToList();
|
|
var test30 = addList.Count;
|
|
var test31 = addList.Select(o => o.Id).Distinct().Count();
|
|
await context.BulkDeleteAsync(balanceList.Where(o => o.Item2 == 2 && o.Item1.Qty == decimal.Zero).Select(o => o.Item1).ToList()).ConfigureAwait(false);
|
|
transaction.Commit();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
this._logger.LogError(ex.ToString());
|
|
throw new UserFriendlyException(ex.ToString(), "500");
|
|
}
|
|
finally
|
|
{
|
|
sw.Stop();
|
|
this._logger.LogInformation($"处理{fetchSize}条,耗时 {sw.ElapsedMilliseconds / 1000 / 60}分钟,{sw.ElapsedMilliseconds / 1000}秒");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public class LogToBalance
|
|
{
|
|
public VmiMessage VmiMessage { get; set; }
|
|
public VmiLog Log { get; set; }
|
|
public string Table { get; set; }
|
|
}
|
|
|