学 赵
1 year ago
10 changed files with 613 additions and 577 deletions
@ -0,0 +1,173 @@ |
|||||
|
using System; |
||||
|
using System.Data.SqlClient; |
||||
|
using System.Linq; |
||||
|
using System.Linq.Dynamic.Core; |
||||
|
using System.Text.Json; |
||||
|
using System.Threading.Tasks; |
||||
|
using EFCore.BulkExtensions; |
||||
|
using Magicodes.ExporterAndImporter.Core.Extension; |
||||
|
using Microsoft.AspNetCore.Mvc; |
||||
|
using Microsoft.AspNetCore.SignalR; |
||||
|
using Microsoft.EntityFrameworkCore; |
||||
|
using Microsoft.Extensions.Configuration; |
||||
|
using Microsoft.Extensions.DependencyInjection; |
||||
|
using Omu.ValueInjecter; |
||||
|
using SettleAccount.Job.SignalR; |
||||
|
using Volo.Abp.Application.Services; |
||||
|
using Volo.Abp.DependencyInjection; |
||||
|
using Win.Sfs.SettleAccount.Entities.BQ.Vmi; |
||||
|
using Win.Sfs.Shared.RepositoryBase; |
||||
|
|
||||
|
namespace Win.Sfs.SettleAccount.Entities.BQ; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// 异步更新库存
|
||||
|
/// </summary>
|
||||
|
public class VmiAsyncBalanceService : ApplicationService, IJobService, ITransientDependency |
||||
|
{ |
||||
|
private readonly IServiceProvider _serviceProvider; |
||||
|
|
||||
|
public VmiAsyncBalanceService(IServiceProvider serviceProvider) |
||||
|
{ |
||||
|
this._serviceProvider = serviceProvider; |
||||
|
} |
||||
|
|
||||
|
public async Task Invoke(IServiceProvider serviceProvider) |
||||
|
{ |
||||
|
var connectionString = serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
||||
|
using var connection = new SqlConnection(connectionString); |
||||
|
connection.Open(); |
||||
|
using var transaction = connection.BeginTransaction(); |
||||
|
try |
||||
|
{ |
||||
|
var command = connection.CreateCommand(); |
||||
|
command.Transaction = transaction; |
||||
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
||||
|
using var context = new SettleAccountDbContext(options); |
||||
|
context.Database.UseTransaction(transaction); |
||||
|
var messages = context.Set<VmiMessage>().Where(o => !o.isConsumed).OrderBy(o => o.Number).Take(1000 * 100).ToList(); |
||||
|
var repo = context.Set<VmiBalance>(); |
||||
|
foreach (var message in messages) |
||||
|
{ |
||||
|
var log = JsonSerializer.Deserialize<VmiLog>(message.Message); |
||||
|
log.SetId(Guid.Parse(JsonSerializer.Deserialize<JsonElement>(message.Message).GetProperty("Id").GetString())); |
||||
|
//插入分表
|
||||
|
var table = $"Set_VmiLog_{log.ChangedTime.Year}_{(log.ChangedTime.Month - 1) / 3 + 1}"; |
||||
|
command.CommandText = $"select OBJECT_ID('{table}', 'U')"; |
||||
|
var result = command.ExecuteScalar().ToString(); |
||||
|
if (result == string.Empty) |
||||
|
{ |
||||
|
command.CommandText = $"select * into {table} from Set_VmiLog where 1=0;"; |
||||
|
command.ExecuteNonQuery(); |
||||
|
command.CommandText = $"create clustered index IX_{table}_ChangedTime on {table} (ChangedTime);"; |
||||
|
command.ExecuteNonQuery(); |
||||
|
command.CommandText = $"alter table {table} add constraint PK_{table} primary key (Id);"; |
||||
|
command.ExecuteNonQuery(); |
||||
|
} |
||||
|
//插入到分表
|
||||
|
command.CommandText = $"insert into {table} select * from Set_VmiLog where id ='{log.Id}'"; |
||||
|
command.ExecuteNonQuery(); |
||||
|
//插入库存
|
||||
|
var balance = context.Set<VmiBalance>().FirstOrDefault( |
||||
|
o => o.DeliverBillType == log.DeliverBillType && |
||||
|
o.CodeType == log.CodeType && |
||||
|
o.DeliverBillType == log.DeliverBillType && |
||||
|
o.VinCode == log.VinCode && |
||||
|
o.ErpToLoc == log.ErpToLoc && |
||||
|
o.OrderNum == log.OrderNum && |
||||
|
o.factory == log.factory && |
||||
|
o.Configcode == log.Configcode); |
||||
|
if (balance == null) |
||||
|
{ |
||||
|
balance = new VmiBalance(GuidGenerator.Create()); |
||||
|
balance.InjectFrom(log); |
||||
|
await repo.AddAsync(balance).ConfigureAwait(false); |
||||
|
log.InjectFrom(balance); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
var logType = log.LogType; |
||||
|
|
||||
|
var qty = balance.Qty;// + log.ty
|
||||
|
if (logType == VmiLogType.Type100) |
||||
|
{ |
||||
|
//发运入库,负库存字段需要更新
|
||||
|
if (balance.Qty < decimal.Zero) |
||||
|
{ |
||||
|
balance.InjectFrom(log); |
||||
|
} |
||||
|
} |
||||
|
else if (logType == VmiLogType.Type300) |
||||
|
{ |
||||
|
//反结入库,只更新库存
|
||||
|
} |
||||
|
else if (logType == VmiLogType.Type500) |
||||
|
{ |
||||
|
//调整入库,更新库存和其他字段
|
||||
|
balance.InjectFrom(log); |
||||
|
} |
||||
|
// 更新库存
|
||||
|
balance.Qty = qty; |
||||
|
if (balance.Qty == decimal.Zero) |
||||
|
{ |
||||
|
//删除0库存
|
||||
|
repo.Remove(balance); |
||||
|
} |
||||
|
if (logType == VmiLogType.Type100 && balance.Qty < decimal.Zero && log.Qty > 0) |
||||
|
{ |
||||
|
//添加负库存补货记录
|
||||
|
var log2 = new VmiReplenished(); |
||||
|
log2.InjectFrom(log); |
||||
|
await context.Set<VmiReplenished>().AddAsync(log2).ConfigureAwait(false); |
||||
|
} |
||||
|
} |
||||
|
message.isConsumed = true; |
||||
|
} |
||||
|
context.SaveChanges(); |
||||
|
transaction.Commit(); |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
Console.WriteLine(ex.ToString()); |
||||
|
transaction.Rollback(); |
||||
|
throw; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// 消息表定时清理
|
||||
|
/// </summary>
|
||||
|
public class VmiAsyncMessageService : Controller, IApplicationService, IJobService, ITransientDependency |
||||
|
{ |
||||
|
private readonly IServiceProvider _serviceProvider; |
||||
|
|
||||
|
public VmiAsyncMessageService(IServiceProvider serviceProvider) |
||||
|
{ |
||||
|
this._serviceProvider = serviceProvider; |
||||
|
} |
||||
|
|
||||
|
[NonAction] |
||||
|
public Task Invoke(IServiceProvider serviceProvider) |
||||
|
{ |
||||
|
using var scope = serviceProvider.CreateScope(); |
||||
|
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>(); |
||||
|
db.Set<VmiMessage>().Where(o => o.isConsumed).BatchDelete(); |
||||
|
var count = db.Set<VmiMessage>().Where(o => !o.isConsumed).Count(); |
||||
|
scope.ServiceProvider.GetService<IHubContext<PageHub>>().Clients.All.ServerToClient("VmiBalance", count.ToString(), ""); |
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// 未处理消息数量
|
||||
|
/// </summary>
|
||||
|
/// <returns></returns>
|
||||
|
[HttpPost] |
||||
|
public int GetMessageCount() |
||||
|
{ |
||||
|
using var scope = this._serviceProvider.CreateScope(); |
||||
|
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>(); |
||||
|
var count = db.Set<VmiMessage>().Where(o => !o.isConsumed).Count(); |
||||
|
return count; |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue