diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs index be57c1c2..4f5f1991 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncBalanceService.cs @@ -4,15 +4,11 @@ using System.Linq; using System.Linq.Dynamic.Core; using System.Text.Json; using System.Threading.Tasks; -using EFCore.BulkExtensions; using Magicodes.ExporterAndImporter.Core.Extension; -using Microsoft.AspNetCore.Mvc; -using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Omu.ValueInjecter; -using SettleAccount.Job.SignalR; using Volo.Abp.Application.Services; using Volo.Abp.DependencyInjection; using Win.Sfs.SettleAccount.Entities.BQ.Vmi; @@ -34,140 +30,114 @@ public class VmiAsyncBalanceService : ApplicationService, IJobService, ITransien public async Task Invoke(IServiceProvider serviceProvider) { - var connectionString = serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); - using var connection = new SqlConnection(connectionString); - connection.Open(); - using var transaction = connection.BeginTransaction(); - try + for (var i = 0; i < 30; i++) { - var command = connection.CreateCommand(); - command.Transaction = transaction; - var options = new DbContextOptionsBuilder().UseSqlServer(connection).Options; - using var context = new SettleAccountDbContext(options); - context.Database.UseTransaction(transaction); - var messages = context.Set().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(1000 * 100).ToList(); - var repo = context.Set(); - foreach (var message in messages) + var connectionString = serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); + using var connection = new SqlConnection(connectionString); + connection.Open(); + using var transaction = connection.BeginTransaction(); + try { - var log = JsonSerializer.Deserialize(message.Message); - log.SetId(Guid.Parse(JsonSerializer.Deserialize(message.Message).GetProperty("Id").GetString())); - //插入分表 - var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}"; - command.CommandText = $"select OBJECT_ID('{table}', 'U')"; - var result = command.ExecuteScalar().ToString(); - if (result == string.Empty) + var command = connection.CreateCommand(); + command.Transaction = transaction; + var options = new DbContextOptionsBuilder().UseSqlServer(connection).Options; + using var context = new SettleAccountDbContext(options); + context.Database.UseTransaction(transaction); + if (!context.Set().Any(o => !o.isConsumed)) { - command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;"; - command.ExecuteNonQuery(); - command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);"; - command.ExecuteNonQuery(); - command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);"; - command.ExecuteNonQuery(); + break; } - //插入到分表 - command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'"; - command.ExecuteNonQuery(); - //插入库存 - var balance = context.Set().FirstOrDefault( - o => o.DeliverBillType == log.DeliverBillType && - o.CodeType == log.CodeType && - o.DeliverBillType == log.DeliverBillType && - o.VinCode == log.VinCode && - o.ErpToLoc == log.ErpToLoc && - o.OrderNum == log.OrderNum && - o.factory == log.factory && - o.Configcode == log.Configcode); - if (balance == null) + var messages = context.Set().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(1000).ToList(); + var repo = context.Set(); + foreach (var message in messages) { - balance = new VmiBalance(GuidGenerator.Create()); - balance.InjectFrom(log); - await repo.AddAsync(balance).ConfigureAwait(false); - log.InjectFrom(balance); - } - else - { - var logType = log.LogType; - - var qty = balance.Qty;// + log.ty - if (logType == VmiLogType.Type100) - { - //发运入库,负库存字段需要更新 - if (balance.Qty < decimal.Zero) - { - balance.InjectFrom(log); - } - } - else if (logType == VmiLogType.Type300) + var log = JsonSerializer.Deserialize(message.Message); + log.SetId(Guid.Parse(JsonSerializer.Deserialize(message.Message).GetProperty("Id").GetString())); + //插入分表 + var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}"; + command.CommandText = $"select OBJECT_ID('{table}', 'U')"; + var result = command.ExecuteScalar().ToString(); + if (result == string.Empty) { - //反结入库,只更新库存 + command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;"; + command.ExecuteNonQuery(); + command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);"; + command.ExecuteNonQuery(); + command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);"; + command.ExecuteNonQuery(); } - else if (logType == VmiLogType.Type500) + //插入到分表 + command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'"; + command.ExecuteNonQuery(); + //插入库存 + var balance = context.Set().FirstOrDefault( + o => o.DeliverBillType == log.DeliverBillType && + o.CodeType == log.CodeType && + o.DeliverBillType == log.DeliverBillType && + o.VinCode == log.VinCode && + o.ErpToLoc == log.ErpToLoc && + o.OrderNum == log.OrderNum && + o.factory == log.factory && + o.Configcode == log.Configcode); + if (balance == null) { - //调整入库,更新库存和其他字段 + balance = new VmiBalance(GuidGenerator.Create()); balance.InjectFrom(log); + await repo.AddAsync(balance).ConfigureAwait(false); + log.InjectFrom(balance); } - // 更新库存 - balance.Qty = qty; - if (balance.Qty == decimal.Zero) - { - //删除0库存 - repo.Remove(balance); - } - if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero && log.Qty > 0) + else { - //添加负库存补货记录 - var log2 = new VmiReplenished(); - log2.InjectFrom(log); - await context.Set().AddAsync(log2).ConfigureAwait(false); + var logType = log.LogType; + + var qty = balance.Qty;// + log.ty + if (logType == VmiLogType.Type100) + { + //发运入库,负库存字段需要更新 + if (balance.Qty < decimal.Zero) + { + balance.InjectFrom(log); + } + } + else if (logType == VmiLogType.Type300) + { + //反结入库,只更新库存 + } + else if (logType == VmiLogType.Type500) + { + //调整入库,更新库存和其他字段 + balance.InjectFrom(log); + } + // 更新库存 + balance.Qty = qty; + if (balance.Qty == decimal.Zero) + { + //删除0库存 + repo.Remove(balance); + } + if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero && log.Qty > 0) + { + //添加负库存补货记录 + var log2 = new VmiReplenished(); + log2.InjectFrom(log); + await context.Set().AddAsync(log2).ConfigureAwait(false); + } } + message.isConsumed = true; } - message.isConsumed = true; + context.SaveChanges(); + transaction.Commit(); + } + catch (Exception ex) + { + Console.WriteLine(ex.ToString()); + transaction.Rollback(); + throw; } - context.SaveChanges(); - transaction.Commit(); - } - catch (Exception ex) - { - Console.WriteLine(ex.ToString()); - transaction.Rollback(); - throw; } } -} - -/// -/// 消息表定时清理 -/// -public class VmiAsyncMessageService : Controller, IApplicationService, IJobService, ITransientDependency -{ - private readonly IServiceProvider _serviceProvider; - - public VmiAsyncMessageService(IServiceProvider serviceProvider) - { - this._serviceProvider = serviceProvider; - } - - [NonAction] - public Task Invoke(IServiceProvider serviceProvider) - { - using var scope = serviceProvider.CreateScope(); - var db = scope.ServiceProvider.GetRequiredService(); - db.Set().Where(o => o.isConsumed).BatchDelete(); - var count = db.Set().Where(o => !o.isConsumed).Count(); - scope.ServiceProvider.GetService>().Clients.All.ServerToClient("VmiBalance", count.ToString(), ""); - return Task.CompletedTask; - } - /// - /// 未处理消息数量 - /// - /// - [HttpPost] - public int GetMessageCount() + public async Task InvokeInternal(IServiceProvider serviceProvider) { - using var scope = this._serviceProvider.CreateScope(); - var db = scope.ServiceProvider.GetRequiredService(); - var count = db.Set().Where(o => !o.isConsumed).Count(); - return count; } } diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs new file mode 100644 index 00000000..be57c1c2 --- /dev/null +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/VmiAsyncMessageService.cs @@ -0,0 +1,173 @@ +using System; +using System.Data.SqlClient; +using System.Linq; +using System.Linq.Dynamic.Core; +using System.Text.Json; +using System.Threading.Tasks; +using EFCore.BulkExtensions; +using Magicodes.ExporterAndImporter.Core.Extension; +using Microsoft.AspNetCore.Mvc; +using Microsoft.AspNetCore.SignalR; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Omu.ValueInjecter; +using SettleAccount.Job.SignalR; +using Volo.Abp.Application.Services; +using Volo.Abp.DependencyInjection; +using Win.Sfs.SettleAccount.Entities.BQ.Vmi; +using Win.Sfs.Shared.RepositoryBase; + +namespace Win.Sfs.SettleAccount.Entities.BQ; + +/// +/// 异步更新库存 +/// +public class VmiAsyncBalanceService : ApplicationService, IJobService, ITransientDependency +{ + private readonly IServiceProvider _serviceProvider; + + public VmiAsyncBalanceService(IServiceProvider serviceProvider) + { + this._serviceProvider = serviceProvider; + } + + public async Task Invoke(IServiceProvider serviceProvider) + { + var connectionString = serviceProvider.GetRequiredService().GetConnectionString("SettleAccountService"); + using var connection = new SqlConnection(connectionString); + connection.Open(); + using var transaction = connection.BeginTransaction(); + try + { + var command = connection.CreateCommand(); + command.Transaction = transaction; + var options = new DbContextOptionsBuilder().UseSqlServer(connection).Options; + using var context = new SettleAccountDbContext(options); + context.Database.UseTransaction(transaction); + var messages = context.Set().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(1000 * 100).ToList(); + var repo = context.Set(); + foreach (var message in messages) + { + var log = JsonSerializer.Deserialize(message.Message); + log.SetId(Guid.Parse(JsonSerializer.Deserialize(message.Message).GetProperty("Id").GetString())); + //插入分表 + var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}"; + command.CommandText = $"select OBJECT_ID('{table}', 'U')"; + var result = command.ExecuteScalar().ToString(); + if (result == string.Empty) + { + command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;"; + command.ExecuteNonQuery(); + command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);"; + command.ExecuteNonQuery(); + command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);"; + command.ExecuteNonQuery(); + } + //插入到分表 + command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'"; + command.ExecuteNonQuery(); + //插入库存 + var balance = context.Set().FirstOrDefault( + o => o.DeliverBillType == log.DeliverBillType && + o.CodeType == log.CodeType && + o.DeliverBillType == log.DeliverBillType && + o.VinCode == log.VinCode && + o.ErpToLoc == log.ErpToLoc && + o.OrderNum == log.OrderNum && + o.factory == log.factory && + o.Configcode == log.Configcode); + if (balance == null) + { + balance = new VmiBalance(GuidGenerator.Create()); + balance.InjectFrom(log); + await repo.AddAsync(balance).ConfigureAwait(false); + log.InjectFrom(balance); + } + else + { + var logType = log.LogType; + + var qty = balance.Qty;// + log.ty + if (logType == VmiLogType.Type100) + { + //发运入库,负库存字段需要更新 + if (balance.Qty < decimal.Zero) + { + balance.InjectFrom(log); + } + } + else if (logType == VmiLogType.Type300) + { + //反结入库,只更新库存 + } + else if (logType == VmiLogType.Type500) + { + //调整入库,更新库存和其他字段 + balance.InjectFrom(log); + } + // 更新库存 + balance.Qty = qty; + if (balance.Qty == decimal.Zero) + { + //删除0库存 + repo.Remove(balance); + } + if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero && log.Qty > 0) + { + //添加负库存补货记录 + var log2 = new VmiReplenished(); + log2.InjectFrom(log); + await context.Set().AddAsync(log2).ConfigureAwait(false); + } + } + message.isConsumed = true; + } + context.SaveChanges(); + transaction.Commit(); + } + catch (Exception ex) + { + Console.WriteLine(ex.ToString()); + transaction.Rollback(); + throw; + } + } +} + +/// +/// 消息表定时清理 +/// +public class VmiAsyncMessageService : Controller, IApplicationService, IJobService, ITransientDependency +{ + private readonly IServiceProvider _serviceProvider; + + public VmiAsyncMessageService(IServiceProvider serviceProvider) + { + this._serviceProvider = serviceProvider; + } + + [NonAction] + public Task Invoke(IServiceProvider serviceProvider) + { + using var scope = serviceProvider.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + db.Set().Where(o => o.isConsumed).BatchDelete(); + var count = db.Set().Where(o => !o.isConsumed).Count(); + scope.ServiceProvider.GetService>().Clients.All.ServerToClient("VmiBalance", count.ToString(), ""); + return Task.CompletedTask; + } + + /// + /// 未处理消息数量 + /// + /// + [HttpPost] + public int GetMessageCount() + { + using var scope = this._serviceProvider.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var count = db.Set().Where(o => !o.isConsumed).Count(); + return count; + } +}