You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

157 lines
6.9 KiB

using Microsoft.Extensions.DependencyInjection;
using Quartz;
using Serilog;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Wood.Data.Repository;
using Wood.Entity;
using Wood.Entity.SystemManage;
using Wood.EventBus;
using Wood.EventBus.Events;
using Wood.Util;
namespace Wood.AutoJob
{
public abstract class AutoJobTask : IJob
{
/// <summary>
/// 执行逻辑
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public virtual Task Run(IJobExecutionContext context, IServiceProvider provider)
{
throw new NotImplementedException();
}
public async Task Execute(IJobExecutionContext context)
{
DateTime now = DateTime.Now;
using var scope = GlobalContext.ServiceProvider!.CreateScope();
var jobTriggerRepository = scope.ServiceProvider.GetRequiredService<SqlSugarRepository<JobTriggerEntity>>();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
var logJobRepository = scope.ServiceProvider.GetRequiredService<SqlSugarRepository<LogJobEntity>>();
JobDataMap jobData = context.JobDetail.JobDataMap;
long detailId = jobData["DetailId"].ToString()?.ToLong() ?? 0;
long triggerId = jobData["TriggerId"].ToString()?.ToLong() ?? 0;
// 获取数据库中的任务
JobTriggerEntity triggerEntity = await jobTriggerRepository.GetByIdAsync(triggerId);
if (triggerEntity != null)
{
var logJob = new LogJobEntity()
{
Elapsed=0,
TriggerId = triggerEntity.TriggerId,
JobId = triggerEntity.JobId,
RetryCount=0,
RunTime = now,
};
var planRuntime = triggerEntity.NextRunTime;
triggerEntity.LastRunTime = now;
triggerEntity.NextRunTime = context.NextFireTimeUtc?.DateTime.AddHours(8);
//任务已经暂停
if (triggerEntity.Status == TriggerStatusEnum.Pause ||
triggerEntity.Status == TriggerStatusEnum.Archived ||
triggerEntity.Status == TriggerStatusEnum.Overrun ||
triggerEntity.Status == TriggerStatusEnum.Panic
)
{
//job 更新下次执行事时间
logJob.Status = triggerEntity.Status.GetDescription();
await logJobRepository.InsertAsync(logJob);
await jobTriggerRepository.UpdateAsync(triggerEntity);
return;
}
else if (triggerEntity.Status == TriggerStatusEnum.Running)
{
//上个任务正在实行 设置为 阻塞
triggerEntity.Status = TriggerStatusEnum.Blocked;
logJob.Status = triggerEntity.Status.GetDescription();
await logJobRepository.InsertAsync(logJob);
await jobTriggerRepository.UpdateAsync(triggerEntity);
return;
}
else if (triggerEntity.EndTime.HasValue && now > triggerEntity.EndTime.Value)
{
//超过任务结束时间 设置为归档
triggerEntity.Status = TriggerStatusEnum.Archived;
logJob.Status = triggerEntity.Status.GetDescription();
await logJobRepository.InsertAsync(logJob);
await jobTriggerRepository.UpdateAsync(triggerEntity);
return;
}
else
{
triggerEntity.NumberOfRuns++;
Stopwatch stopwatch = Stopwatch.StartNew();
//重试次数
int i = 0;
string status = triggerEntity.Status.GetDescription();
//只有在 就绪状态才能继续执行
if (triggerEntity.Status == TriggerStatusEnum.ErrorToReady || triggerEntity.Status == TriggerStatusEnum.Ready || triggerEntity.Status == TriggerStatusEnum.Blocked)
{
triggerEntity.Status = TriggerStatusEnum.Running; //正常开始运行
//执行前 job更新状态
await jobTriggerRepository.UpdateAsync(triggerEntity);
//设置重试次数,最小执行一次
int retryCount = triggerEntity.NumRetries;
if (triggerEntity.NumRetries == 0)
retryCount = 1;
for (; i < retryCount; i++)
{
try
{
//执行逻辑
await Run(context, scope.ServiceProvider);
status = "执行完成";
if (triggerEntity.MaxNumberOfRuns > 0 && triggerEntity.MaxNumberOfRuns <= triggerEntity.NumberOfRuns)
triggerEntity.Status = TriggerStatusEnum.Overrun;
else
triggerEntity.Status = TriggerStatusEnum.Ready;
break;
}
catch (Exception ex)
{
status = "执行出错";
//记录错误信息
eventBus.Publish(LogExceptionEvent.NewEvent(ex, triggerEntity.JobId!, triggerEntity.TriggerId!, "Execute"));
//出现错误
triggerEntity.NumberOfErrors++;
if (triggerEntity.MaxNumberOfErrors > 0 && triggerEntity.MaxNumberOfErrors < triggerEntity.NumberOfErrors)
triggerEntity.Status = TriggerStatusEnum.Panic;
else
triggerEntity.Status = TriggerStatusEnum.ErrorToReady;
Log.Error(ex, $"执行job出错!【{triggerEntity.JobId}】");
}
await Task.Delay(triggerEntity.RetryTimeout);
}
}
stopwatch.Stop();
logJob.Elapsed = stopwatch.ElapsedMilliseconds;
logJob.Status = "正常执行";
logJob.RetryCount = i;
await logJobRepository.InsertAsync(logJob);
//job 执行完 更新状态
await jobTriggerRepository.UpdateAsync(triggerEntity);
}
}
}
}
}