diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/JobItemAppService.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/JobItemAppService.cs index e8e06f11..6a6f59b9 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/JobItemAppService.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/Entities/BQ/JobItemAppService.cs @@ -1,27 +1,202 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Linq.Dynamic.Core; +using System.Threading; using System.Threading.Tasks; -using EFCore.BulkExtensions; +using Cronos; using Microsoft.AspNetCore.Mvc; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Omu.ValueInjecter; using Volo.Abp.Application.Dtos; +using Volo.Abp.Application.Services; +using Volo.Abp.DependencyInjection; using Win.Sfs.SettleAccount.Entities.BQ.Dtos; using Win.Sfs.SettleAccount.Entities.BQ.Vmi; -using Win.Sfs.SettleAccount.Entities.Materials; using Win.Sfs.Shared.RepositoryBase; namespace Win.Sfs.SettleAccount.Entities.BQ; +public class JobHostdService : BackgroundService +{ + private readonly object _lockObj = new object(); + private readonly IServiceProvider _serviceProvider; + private CancellationToken _stoppingToken; + + public JobHostdService(IServiceProvider serviceProvider) + { + this._serviceProvider = serviceProvider; + } + + public ConcurrentDictionary> Jobs { get; } = new(); + + public static List ServiceTypes { get; private set; } + + public static void AddService(IServiceCollection services) + { + ServiceTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(o => o.GetTypes()) + .Where(o => o.IsClass && !o.IsAbstract && o.IsAssignableTo(typeof(IJobService))) + .ToList(); + ServiceTypes.ForEach(o => + { + services.AddTransient(typeof(IJobService), o); + services.AddTransient(o); + }); + } + + public void AddJob(JobItem job) + { + lock (_lockObj) + { + if (!Jobs.Keys.Any(o => o.Id == job.Id)) + { + try + { + var source = new CancellationTokenSource(); + var token = source.Token; + var thread = new Thread(async () => + { + var expression = CronExpression.Parse(job.Cron); + using var scope1 = this._serviceProvider.CreateScope(); + var serviceType = ServiceTypes.FirstOrDefault(o => o.FullName == job.Service); + if (serviceType != null) + { + while (!_stoppingToken.IsCancellationRequested && !token.IsCancellationRequested) + { + + try + { + var nextUtc = expression.GetNextOccurrence(DateTime.UtcNow); + var span = nextUtc - DateTime.UtcNow; + await Task.Delay(span.Value).ConfigureAwait(false); + using var scope = this._serviceProvider.CreateScope(); + if (scope.ServiceProvider.GetService(serviceType) is IJobService jobService) + { + var db = scope.ServiceProvider.GetService(); + var jobItemRepository = db.Set(); + var jobLogRepository = db.Set(); + var jobItem = jobItemRepository.FirstOrDefault(o => o.Id == job.Id); + if (!jobItem.IsDisabled) + { + jobItem.IsRunning = true; + db.SaveChanges(); + var jobLog = new JobLog { JobId = job.Id, Start = DateTime.Now }; + try + { + jobService.Invoke(); + jobLog.Success = true; + } + catch (Exception ex) + { + Console.WriteLine(ex.ToString()); + jobLog.Exception = ex.ToString(); + } + finally + { + jobLog.End = DateTime.Now; + jobLogRepository.Add(jobLog); + jobItem.IsRunning = false; + db.SaveChanges(); + } + } + } + } + catch (Exception) + { + throw; + } + } + } + }); + thread.IsBackground = true; + if (this.Jobs.TryAdd(job, new Tuple(source, thread))) + { + thread.Start(); + } + } + catch (Exception ex) + { + Console.WriteLine(ex.ToString()); + } + } + } + } + + public void RemoveJob(JobItem item) + { + lock (_lockObj) + { + var key = this.Jobs.Keys.FirstOrDefault(o => o.Id == item.Id); + if (key != null) + { + try + { + if (this.Jobs.TryRemove(key, out var value)) + { + value.Item1.Cancel(); + value.Item2.Interrupt(); + value.Item2.Join(); + } + } + catch (Exception ex) + { + Console.WriteLine(ex.ToString()); + } + } + } + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + this._stoppingToken = stoppingToken; + using var scope = this._serviceProvider.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var jobs = db.Set().AsNoTracking().ToList(); + jobs.ForEach(this.AddJob); + return Task.CompletedTask; + } +} + [Route("api/settleaccount/[controller]/[action]")] -public class JobItemAppService : SettleAccountApplicationBase +public class JobItemAppService : ApplicationService, ITransientDependency { private readonly INormalEfCoreRepository _repository; + private readonly JobHostdService _jobHostdService; - public JobItemAppService(INormalEfCoreRepository repository) + public JobItemAppService(INormalEfCoreRepository repository, JobHostdService jobHostdService) { this._repository = repository; + this._jobHostdService = jobHostdService; + } + + [HttpPost] + public async Task CreateAsync(JobItem input) + { + var entity = await _repository.InsertAsync(input).ConfigureAwait(false); + this._jobHostdService.AddJob(entity); + return entity; + } + + [HttpPost] + public async Task DeleteListAsync(List ids) + { + var entites = _repository.Where(p => ids.Contains(p.Id)).ToList(); + foreach (var item in entites) + { + await _repository.DeleteAsync(item).ConfigureAwait(false); + this._jobHostdService.RemoveJob(item); + } + return entites.Count > 0; + } + + [HttpPost("{id}")] + public async Task DetailsAsync(Guid id) + { + var entity = await _repository.FindAsync(id).ConfigureAwait(false); + return entity; } [HttpPost] @@ -33,13 +208,6 @@ public class JobItemAppService : SettleAccountApplicationBase return new PagedResultDto(totalCount, entities); } - [HttpPost] - public async Task CreateAsync(JobItem input) - { - await _repository.InsertAsync(input).ConfigureAwait(false); - return input; - } - [HttpPost("{id}")] public async Task UpdateAsync(Guid id, JobItem input) { @@ -47,22 +215,16 @@ public class JobItemAppService : SettleAccountApplicationBase if (entity != null) { entity.InjectFrom(input); + await _repository.UpdateAsync(entity).ConfigureAwait(false); + this._jobHostdService.RemoveJob(entity); + this._jobHostdService.AddJob(entity); } - await _repository.UpdateAsync(entity).ConfigureAwait(false); return input; } - - [HttpPost] - public async Task DeleteListAsync(List ids) - { - var _query = _repository.Where(p => ids.Contains(p.Id)); - int i = await _query.BatchDeleteAsync().ConfigureAwait(false); - return i == 0; - } } [Route("api/settleaccount/[controller]/[action]")] -public class JobLogAppService : SettleAccountApplicationBase +public class JobLogAppService : ApplicationService, ITransientDependency { private readonly INormalEfCoreRepository _repository; diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Application/SettleAccount.Application.csproj b/code/src/Modules/SettleAccount/src/SettleAccount.Application/SettleAccount.Application.csproj index 2a809cee..a3ff4858 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Application/SettleAccount.Application.csproj +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Application/SettleAccount.Application.csproj @@ -117,6 +117,7 @@ + diff --git a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/JobItem.cs b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/JobItem.cs index e00008e7..784afbc4 100644 --- a/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/JobItem.cs +++ b/code/src/Modules/SettleAccount/src/SettleAccount.Domain/Entities/BQ/Vmi/JobItem.cs @@ -1,4 +1,5 @@ using System; +using System.ComponentModel.DataAnnotations; using Volo.Abp.Domain.Entities; namespace Win.Sfs.SettleAccount.Entities.BQ.Vmi; @@ -11,9 +12,16 @@ public class JobItem : Entity, IHasConcurrencyStamp } public bool IsDisabled { get; set; } + + [Required] public string Name { get; set; } + + [Required] public string Cron { get; set; } + + [Required] public string Service { get; set; } + public bool IsRunning { get; set; } public string ConcurrencyStamp { get; set; } }