wanggang 1 year ago
parent
commit
042950255e
  1. 212
      code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs
  2. 173
      code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs

212
code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs

@ -4,15 +4,11 @@ using System.Linq;
using System.Linq.Dynamic.Core;
using System.Text.Json;
using System.Threading.Tasks;
using EFCore.BulkExtensions;
using Magicodes.ExporterAndImporter.Core.Extension;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Omu.ValueInjecter;
using SettleAccount.Job.SignalR;
using Volo.Abp.Application.Services;
using Volo.Abp.DependencyInjection;
using Win.Sfs.SettleAccount.Entities.BQ.Vmi;
@ -34,140 +30,114 @@ public class VmiAsyncBalanceService : ApplicationService, IJobService, ITransien
public async Task Invoke(IServiceProvider serviceProvider)
{
var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
using var connection = new SqlConnection(connectionString);
connection.Open();
using var transaction = connection.BeginTransaction();
try
for (var i = 0; i < 30; i++)
{
var command = connection.CreateCommand();
command.Transaction = transaction;
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options;
using var context = new SettleAccountDbContext(options);
context.Database.UseTransaction(transaction);
var messages = context.Set<VmiMessage>().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(1000 * 100).ToList();
var repo = context.Set<VmiBalance>();
foreach (var message in messages)
var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
using var connection = new SqlConnection(connectionString);
connection.Open();
using var transaction = connection.BeginTransaction();
try
{
var log = JsonSerializer.Deserialize<VmiLog>(message.Message);
log.SetId(Guid.Parse(JsonSerializer.Deserialize<JsonElement>(message.Message).GetProperty("Id").GetString()));
//插入分表
var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}";
command.CommandText = $"select OBJECT_ID('{table}', 'U')";
var result = command.ExecuteScalar().ToString();
if (result == string.Empty)
var command = connection.CreateCommand();
command.Transaction = transaction;
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options;
using var context = new SettleAccountDbContext(options);
context.Database.UseTransaction(transaction);
if (!context.Set<VmiMessage>().Any(o => !o.isConsumed))
{
command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;";
command.ExecuteNonQuery();
command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);";
command.ExecuteNonQuery();
command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);";
command.ExecuteNonQuery();
break;
}
//插入到分表
command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'";
command.ExecuteNonQuery();
//插入库存
var balance = context.Set<VmiBalance>().FirstOrDefault(
o => o.DeliverBillType == log.DeliverBillType &&
o.CodeType == log.CodeType &&
o.DeliverBillType == log.DeliverBillType &&
o.VinCode == log.VinCode &&
o.ErpToLoc == log.ErpToLoc &&
o.OrderNum == log.OrderNum &&
o.factory == log.factory &&
o.Configcode == log.Configcode);
if (balance == null)
var messages = context.Set<VmiMessage>().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(1000).ToList();
var repo = context.Set<VmiBalance>();
foreach (var message in messages)
{
balance = new VmiBalance(GuidGenerator.Create());
balance.InjectFrom(log);
await repo.AddAsync(balance).ConfigureAwait(false);
log.InjectFrom(balance);
}
else
{
var logType = log.LogType;
var qty = balance.Qty;// + log.ty
if (logType == VmiLogType.Type100)
{
//发运入库,负库存字段需要更新
if (balance.Qty < decimal.Zero)
{
balance.InjectFrom(log);
}
}
else if (logType == VmiLogType.Type300)
var log = JsonSerializer.Deserialize<VmiLog>(message.Message);
log.SetId(Guid.Parse(JsonSerializer.Deserialize<JsonElement>(message.Message).GetProperty("Id").GetString()));
//插入分表
var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}";
command.CommandText = $"select OBJECT_ID('{table}', 'U')";
var result = command.ExecuteScalar().ToString();
if (result == string.Empty)
{
//反结入库,只更新库存
command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;";
command.ExecuteNonQuery();
command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);";
command.ExecuteNonQuery();
command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);";
command.ExecuteNonQuery();
}
else if (logType == VmiLogType.Type500)
//插入到分表
command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'";
command.ExecuteNonQuery();
//插入库存
var balance = context.Set<VmiBalance>().FirstOrDefault(
o => o.DeliverBillType == log.DeliverBillType &&
o.CodeType == log.CodeType &&
o.DeliverBillType == log.DeliverBillType &&
o.VinCode == log.VinCode &&
o.ErpToLoc == log.ErpToLoc &&
o.OrderNum == log.OrderNum &&
o.factory == log.factory &&
o.Configcode == log.Configcode);
if (balance == null)
{
//调整入库,更新库存和其他字段
balance = new VmiBalance(GuidGenerator.Create());
balance.InjectFrom(log);
await repo.AddAsync(balance).ConfigureAwait(false);
log.InjectFrom(balance);
}
// 更新库存
balance.Qty = qty;
if (balance.Qty == decimal.Zero)
{
//删除0库存
repo.Remove(balance);
}
if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero && log.Qty > 0)
else
{
//添加负库存补货记录
var log2 = new VmiReplenished();
log2.InjectFrom(log);
await context.Set<VmiReplenished>().AddAsync(log2).ConfigureAwait(false);
var logType = log.LogType;
var qty = balance.Qty;// + log.ty
if (logType == VmiLogType.Type100)
{
//发运入库,负库存字段需要更新
if (balance.Qty < decimal.Zero)
{
balance.InjectFrom(log);
}
}
else if (logType == VmiLogType.Type300)
{
//反结入库,只更新库存
}
else if (logType == VmiLogType.Type500)
{
//调整入库,更新库存和其他字段
balance.InjectFrom(log);
}
// 更新库存
balance.Qty = qty;
if (balance.Qty == decimal.Zero)
{
//删除0库存
repo.Remove(balance);
}
if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero && log.Qty > 0)
{
//添加负库存补货记录
var log2 = new VmiReplenished();
log2.InjectFrom(log);
await context.Set<VmiReplenished>().AddAsync(log2).ConfigureAwait(false);
}
}
message.isConsumed = true;
}
message.isConsumed = true;
context.SaveChanges();
transaction.Commit();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
transaction.Rollback();
throw;
}
context.SaveChanges();
transaction.Commit();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
transaction.Rollback();
throw;
}
}
}
/// <summary>
/// 消息表定时清理
/// </summary>
public class VmiAsyncMessageService : Controller, IApplicationService, IJobService, ITransientDependency
{
private readonly IServiceProvider _serviceProvider;
public VmiAsyncMessageService(IServiceProvider serviceProvider)
{
this._serviceProvider = serviceProvider;
}
[NonAction]
public Task Invoke(IServiceProvider serviceProvider)
{
using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>();
db.Set<VmiMessage>().Where(o => o.isConsumed).BatchDelete();
var count = db.Set<VmiMessage>().Where(o => !o.isConsumed).Count();
scope.ServiceProvider.GetService<IHubContext<PageHub>>().Clients.All.ServerToClient("VmiBalance", count.ToString(), "");
return Task.CompletedTask;
}
/// <summary>
/// 未处理消息数量
/// </summary>
/// <returns></returns>
[HttpPost]
public int GetMessageCount()
public async Task InvokeInternal(IServiceProvider serviceProvider)
{
using var scope = this._serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>();
var count = db.Set<VmiMessage>().Where(o => !o.isConsumed).Count();
return count;
}
}

173
code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs

@ -0,0 +1,173 @@
using System;
using System.Data.SqlClient;
using System.Linq;
using System.Linq.Dynamic.Core;
using System.Text.Json;
using System.Threading.Tasks;
using EFCore.BulkExtensions;
using Magicodes.ExporterAndImporter.Core.Extension;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Omu.ValueInjecter;
using SettleAccount.Job.SignalR;
using Volo.Abp.Application.Services;
using Volo.Abp.DependencyInjection;
using Win.Sfs.SettleAccount.Entities.BQ.Vmi;
using Win.Sfs.Shared.RepositoryBase;
namespace Win.Sfs.SettleAccount.Entities.BQ;
/// <summary>
/// 异步更新库存
/// </summary>
public class VmiAsyncBalanceService : ApplicationService, IJobService, ITransientDependency
{
private readonly IServiceProvider _serviceProvider;
public VmiAsyncBalanceService(IServiceProvider serviceProvider)
{
this._serviceProvider = serviceProvider;
}
public async Task Invoke(IServiceProvider serviceProvider)
{
var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService");
using var connection = new SqlConnection(connectionString);
connection.Open();
using var transaction = connection.BeginTransaction();
try
{
var command = connection.CreateCommand();
command.Transaction = transaction;
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options;
using var context = new SettleAccountDbContext(options);
context.Database.UseTransaction(transaction);
var messages = context.Set<VmiMessage>().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(1000 * 100).ToList();
var repo = context.Set<VmiBalance>();
foreach (var message in messages)
{
var log = JsonSerializer.Deserialize<VmiLog>(message.Message);
log.SetId(Guid.Parse(JsonSerializer.Deserialize<JsonElement>(message.Message).GetProperty("Id").GetString()));
//插入分表
var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}";
command.CommandText = $"select OBJECT_ID('{table}', 'U')";
var result = command.ExecuteScalar().ToString();
if (result == string.Empty)
{
command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;";
command.ExecuteNonQuery();
command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);";
command.ExecuteNonQuery();
command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);";
command.ExecuteNonQuery();
}
//插入到分表
command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'";
command.ExecuteNonQuery();
//插入库存
var balance = context.Set<VmiBalance>().FirstOrDefault(
o => o.DeliverBillType == log.DeliverBillType &&
o.CodeType == log.CodeType &&
o.DeliverBillType == log.DeliverBillType &&
o.VinCode == log.VinCode &&
o.ErpToLoc == log.ErpToLoc &&
o.OrderNum == log.OrderNum &&
o.factory == log.factory &&
o.Configcode == log.Configcode);
if (balance == null)
{
balance = new VmiBalance(GuidGenerator.Create());
balance.InjectFrom(log);
await repo.AddAsync(balance).ConfigureAwait(false);
log.InjectFrom(balance);
}
else
{
var logType = log.LogType;
var qty = balance.Qty;// + log.ty
if (logType == VmiLogType.Type100)
{
//发运入库,负库存字段需要更新
if (balance.Qty < decimal.Zero)
{
balance.InjectFrom(log);
}
}
else if (logType == VmiLogType.Type300)
{
//反结入库,只更新库存
}
else if (logType == VmiLogType.Type500)
{
//调整入库,更新库存和其他字段
balance.InjectFrom(log);
}
// 更新库存
balance.Qty = qty;
if (balance.Qty == decimal.Zero)
{
//删除0库存
repo.Remove(balance);
}
if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero && log.Qty > 0)
{
//添加负库存补货记录
var log2 = new VmiReplenished();
log2.InjectFrom(log);
await context.Set<VmiReplenished>().AddAsync(log2).ConfigureAwait(false);
}
}
message.isConsumed = true;
}
context.SaveChanges();
transaction.Commit();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
transaction.Rollback();
throw;
}
}
}
/// <summary>
/// 消息表定时清理
/// </summary>
public class VmiAsyncMessageService : Controller, IApplicationService, IJobService, ITransientDependency
{
private readonly IServiceProvider _serviceProvider;
public VmiAsyncMessageService(IServiceProvider serviceProvider)
{
this._serviceProvider = serviceProvider;
}
[NonAction]
public Task Invoke(IServiceProvider serviceProvider)
{
using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>();
db.Set<VmiMessage>().Where(o => o.isConsumed).BatchDelete();
var count = db.Set<VmiMessage>().Where(o => !o.isConsumed).Count();
scope.ServiceProvider.GetService<IHubContext<PageHub>>().Clients.All.ServerToClient("VmiBalance", count.ToString(), "");
return Task.CompletedTask;
}
/// <summary>
/// 未处理消息数量
/// </summary>
/// <returns></returns>
[HttpPost]
public int GetMessageCount()
{
using var scope = this._serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>();
var count = db.Set<VmiMessage>().Where(o => !o.isConsumed).Count();
return count;
}
}
Loading…
Cancel
Save