Browse Source

update

master
wanggang 1 year ago
parent
commit
d32061ac1a
  1. 204
      code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/JobItemAppService.cs
  2. 1
      code/src/Modules/SettleAccount/src/SettleAccount.Application/SettleAccount.Application.csproj
  3. 8
      code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/JobItem.cs

204
code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/JobItemAppService.cs

@ -1,27 +1,202 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Dynamic.Core;
using System.Threading;
using System.Threading.Tasks;
using EFCore.BulkExtensions;
using Cronos;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Omu.ValueInjecter;
using Volo.Abp.Application.Dtos;
using Volo.Abp.Application.Services;
using Volo.Abp.DependencyInjection;
using Win.Sfs.SettleAccount.Entities.BQ.Dtos;
using Win.Sfs.SettleAccount.Entities.BQ.Vmi;
using Win.Sfs.SettleAccount.Entities.Materials;
using Win.Sfs.Shared.RepositoryBase;
namespace Win.Sfs.SettleAccount.Entities.BQ;
public class JobHostdService : BackgroundService
{
private readonly object _lockObj = new object();
private readonly IServiceProvider _serviceProvider;
private CancellationToken _stoppingToken;
public JobHostdService(IServiceProvider serviceProvider)
{
this._serviceProvider = serviceProvider;
}
public ConcurrentDictionary<JobItem, Tuple<CancellationTokenSource, Thread>> Jobs { get; } = new();
public static List<Type> ServiceTypes { get; private set; }
public static void AddService(IServiceCollection services)
{
ServiceTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(o => o.GetTypes())
.Where(o => o.IsClass && !o.IsAbstract && o.IsAssignableTo(typeof(IJobService)))
.ToList();
ServiceTypes.ForEach(o =>
{
services.AddTransient(typeof(IJobService), o);
services.AddTransient(o);
});
}
public void AddJob(JobItem job)
{
lock (_lockObj)
{
if (!Jobs.Keys.Any(o => o.Id == job.Id))
{
try
{
var source = new CancellationTokenSource();
var token = source.Token;
var thread = new Thread(async () =>
{
var expression = CronExpression.Parse(job.Cron);
using var scope1 = this._serviceProvider.CreateScope();
var serviceType = ServiceTypes.FirstOrDefault(o => o.FullName == job.Service);
if (serviceType != null)
{
while (!_stoppingToken.IsCancellationRequested && !token.IsCancellationRequested)
{
try
{
var nextUtc = expression.GetNextOccurrence(DateTime.UtcNow);
var span = nextUtc - DateTime.UtcNow;
await Task.Delay(span.Value).ConfigureAwait(false);
using var scope = this._serviceProvider.CreateScope();
if (scope.ServiceProvider.GetService(serviceType) is IJobService jobService)
{
var db = scope.ServiceProvider.GetService<SettleAccountDbContext>();
var jobItemRepository = db.Set<JobItem>();
var jobLogRepository = db.Set<JobLog>();
var jobItem = jobItemRepository.FirstOrDefault(o => o.Id == job.Id);
if (!jobItem.IsDisabled)
{
jobItem.IsRunning = true;
db.SaveChanges();
var jobLog = new JobLog { JobId = job.Id, Start = DateTime.Now };
try
{
jobService.Invoke();
jobLog.Success = true;
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
jobLog.Exception = ex.ToString();
}
finally
{
jobLog.End = DateTime.Now;
jobLogRepository.Add(jobLog);
jobItem.IsRunning = false;
db.SaveChanges();
}
}
}
}
catch (Exception)
{
throw;
}
}
}
});
thread.IsBackground = true;
if (this.Jobs.TryAdd(job, new Tuple<CancellationTokenSource, Thread>(source, thread)))
{
thread.Start();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
}
public void RemoveJob(JobItem item)
{
lock (_lockObj)
{
var key = this.Jobs.Keys.FirstOrDefault(o => o.Id == item.Id);
if (key != null)
{
try
{
if (this.Jobs.TryRemove(key, out var value))
{
value.Item1.Cancel();
value.Item2.Interrupt();
value.Item2.Join();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
this._stoppingToken = stoppingToken;
using var scope = this._serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<SettleAccountDbContext>();
var jobs = db.Set<JobItem>().AsNoTracking().ToList();
jobs.ForEach(this.AddJob);
return Task.CompletedTask;
}
}
[Route("api/settleaccount/[controller]/[action]")]
public class JobItemAppService : SettleAccountApplicationBase<Material>
public class JobItemAppService : ApplicationService, ITransientDependency
{
private readonly INormalEfCoreRepository<JobItem, Guid> _repository;
private readonly JobHostdService _jobHostdService;
public JobItemAppService(INormalEfCoreRepository<JobItem, Guid> repository)
public JobItemAppService(INormalEfCoreRepository<JobItem, Guid> repository, JobHostdService jobHostdService)
{
this._repository = repository;
this._jobHostdService = jobHostdService;
}
[HttpPost]
public async Task<JobItem> CreateAsync(JobItem input)
{
var entity = await _repository.InsertAsync(input).ConfigureAwait(false);
this._jobHostdService.AddJob(entity);
return entity;
}
[HttpPost]
public async Task<bool> DeleteListAsync(List<Guid> ids)
{
var entites = _repository.Where(p => ids.Contains(p.Id)).ToList();
foreach (var item in entites)
{
await _repository.DeleteAsync(item).ConfigureAwait(false);
this._jobHostdService.RemoveJob(item);
}
return entites.Count > 0;
}
[HttpPost("{id}")]
public async Task<JobItem> DetailsAsync(Guid id)
{
var entity = await _repository.FindAsync(id).ConfigureAwait(false);
return entity;
}
[HttpPost]
@ -33,13 +208,6 @@ public class JobItemAppService : SettleAccountApplicationBase<Material>
return new PagedResultDto<JobItem>(totalCount, entities);
}
[HttpPost]
public async Task<JobItem> CreateAsync(JobItem input)
{
await _repository.InsertAsync(input).ConfigureAwait(false);
return input;
}
[HttpPost("{id}")]
public async Task<JobItem> UpdateAsync(Guid id, JobItem input)
{
@ -47,22 +215,16 @@ public class JobItemAppService : SettleAccountApplicationBase<Material>
if (entity != null)
{
entity.InjectFrom(input);
await _repository.UpdateAsync(entity).ConfigureAwait(false);
this._jobHostdService.RemoveJob(entity);
this._jobHostdService.AddJob(entity);
}
await _repository.UpdateAsync(entity).ConfigureAwait(false);
return input;
}
[HttpPost]
public async Task<bool> DeleteListAsync(List<Guid> ids)
{
var _query = _repository.Where(p => ids.Contains(p.Id));
int i = await _query.BatchDeleteAsync().ConfigureAwait(false);
return i == 0;
}
}
[Route("api/settleaccount/[controller]/[action]")]
public class JobLogAppService : SettleAccountApplicationBase<Material>
public class JobLogAppService : ApplicationService, ITransientDependency
{
private readonly INormalEfCoreRepository<JobLog, Guid> _repository;

1
code/src/Modules/SettleAccount/src/SettleAccount.Application/SettleAccount.Application.csproj

@ -117,6 +117,7 @@
<ItemGroup>
<PackageReference Include="ClosedXML" Version="0.102.0" />
<PackageReference Include="Cronos" Version="0.7.1" />
<PackageReference Include="EFCore.BulkExtensions" Version="5.3.0" />
<PackageReference Include="Flurl" Version="3.0.7" />
<PackageReference Include="Flurl.Http" Version="3.2.4" />

8
code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/JobItem.cs

@ -1,4 +1,5 @@
using System;
using System.ComponentModel.DataAnnotations;
using Volo.Abp.Domain.Entities;
namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi;
@ -11,9 +12,16 @@ public class JobItem : Entity<Guid>, IHasConcurrencyStamp
}
public bool IsDisabled { get; set; }
[Required]
public string Name { get; set; }
[Required]
public string Cron { get; set; }
[Required]
public string Service { get; set; }
public bool IsRunning { get; set; }
public string ConcurrencyStamp { get; set; }
}

Loading…
Cancel
Save