|
@ -68,13 +68,19 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public static List<Type> ServiceTypes { get; private set; } |
|
|
public static List<Type> ServiceTypes { get; private set; } |
|
|
|
|
|
|
|
|
|
|
|
// Jobs字段使用ConcurrentDictionary来存储JobItem、Tuple<CancellationTokenSource, Thread>类型的数据
|
|
|
public ConcurrentDictionary<JobItem, Tuple<CancellationTokenSource, Thread>> Jobs { get; } = new(); |
|
|
public ConcurrentDictionary<JobItem, Tuple<CancellationTokenSource, Thread>> Jobs { get; } = new(); |
|
|
|
|
|
|
|
|
|
|
|
// 用于添加服务到IServiceCollection中
|
|
|
public static void AddService(IServiceCollection services) |
|
|
public static void AddService(IServiceCollection services) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 获取所有实现了IJobService接口的类,并将其添加到ServiceTypes列表中
|
|
|
ServiceTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(o => o.GetTypes()) |
|
|
ServiceTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(o => o.GetTypes()) |
|
|
.Where(o => o.IsClass && !o.IsAbstract && o.IsAssignableTo(typeof(IJobService))) |
|
|
.Where(o => o.IsClass && !o.IsAbstract && o.IsAssignableTo(typeof(IJobService))) |
|
|
.ToList(); |
|
|
.ToList(); |
|
|
|
|
|
|
|
|
|
|
|
// 遍历ServiceTypes列表中的类型,并分别向services中注册IJobService接口和实现类
|
|
|
ServiceTypes.ForEach(o => |
|
|
ServiceTypes.ForEach(o => |
|
|
{ |
|
|
{ |
|
|
services.AddTransient(typeof(IJobService), o); |
|
|
services.AddTransient(typeof(IJobService), o); |
|
@ -82,28 +88,37 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
}); |
|
|
}); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void AddJob(JobItem job) |
|
|
public void AddJob(JobItem job) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 使用锁确保线程安全
|
|
|
lock (_lockObj) |
|
|
lock (_lockObj) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 检查是否存在相同Id的作业
|
|
|
if (!Jobs.Keys.Any(o => o.Id == job.Id)) |
|
|
if (!Jobs.Keys.Any(o => o.Id == job.Id)) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 尝试创建新作业的线程
|
|
|
try |
|
|
try |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 创建取消标记和线程
|
|
|
var source = new CancellationTokenSource(); |
|
|
var source = new CancellationTokenSource(); |
|
|
var token = source.Token; |
|
|
var token = source.Token; |
|
|
var thread = new Thread(async () => |
|
|
var thread = new Thread(async () => |
|
|
{ |
|
|
{ |
|
|
try |
|
|
try |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 解析Cron表达式
|
|
|
var expression = CronExpression.Parse(job.Cron, job.Cron.Split(' ').Length == 5 ? CronFormat.Standard : CronFormat.IncludeSeconds); |
|
|
var expression = CronExpression.Parse(job.Cron, job.Cron.Split(' ').Length == 5 ? CronFormat.Standard : CronFormat.IncludeSeconds); |
|
|
|
|
|
// 获取作业服务类型
|
|
|
var serviceType = ServiceTypes.FirstOrDefault(o => o.FullName == job.Service); |
|
|
var serviceType = ServiceTypes.FirstOrDefault(o => o.FullName == job.Service); |
|
|
if (serviceType != null) |
|
|
if (serviceType != null) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 循环执行作业直到取消标记被设置
|
|
|
while (!_stoppingToken.IsCancellationRequested && !token.IsCancellationRequested) |
|
|
while (!_stoppingToken.IsCancellationRequested && !token.IsCancellationRequested) |
|
|
{ |
|
|
{ |
|
|
try |
|
|
try |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 计算下一次执行时间间隔
|
|
|
var now = DateTime.UtcNow; |
|
|
var now = DateTime.UtcNow; |
|
|
var nextUtc = expression.GetNextOccurrence(now); |
|
|
var nextUtc = expression.GetNextOccurrence(now); |
|
|
var span = nextUtc - now; |
|
|
var span = nextUtc - now; |
|
@ -117,6 +132,7 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
await Task.Delay(currentDelay).ConfigureAwait(false); |
|
|
await Task.Delay(currentDelay).ConfigureAwait(false); |
|
|
} |
|
|
} |
|
|
Debug.WriteLine($"{job.Name} 定时任务开始执行"); |
|
|
Debug.WriteLine($"{job.Name} 定时任务开始执行"); |
|
|
|
|
|
// 创建作用域并获取作业服务实例
|
|
|
using var scope = this._serviceProvider.CreateScope(); |
|
|
using var scope = this._serviceProvider.CreateScope(); |
|
|
if (scope.ServiceProvider.GetService(serviceType) is IJobService jobService) |
|
|
if (scope.ServiceProvider.GetService(serviceType) is IJobService jobService) |
|
|
{ |
|
|
{ |
|
@ -124,6 +140,7 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
{ |
|
|
{ |
|
|
var jobItem = this.GetJobItem(job.Id); |
|
|
var jobItem = this.GetJobItem(job.Id); |
|
|
Guid? jobLogId = null; |
|
|
Guid? jobLogId = null; |
|
|
|
|
|
// 如果作业正在运行并且心跳超过20秒,则停止作业
|
|
|
if (jobItem.IsRunning && (DateTime.Now - jobItem.HeartBeat.Value).TotalSeconds > 20) |
|
|
if (jobItem.IsRunning && (DateTime.Now - jobItem.HeartBeat.Value).TotalSeconds > 20) |
|
|
{ |
|
|
{ |
|
|
JobItemStop(jobItem.Id); |
|
|
JobItemStop(jobItem.Id); |
|
@ -131,13 +148,16 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
if (!jobItem.IsRunning) |
|
|
if (!jobItem.IsRunning) |
|
|
{ |
|
|
{ |
|
|
jobLogId = this.JobItemStart(job.Id); |
|
|
jobLogId = this.JobItemStart(job.Id); |
|
|
|
|
|
// 创建定时器以发送心跳
|
|
|
using var timer = new System.Timers.Timer(10000); |
|
|
using var timer = new System.Timers.Timer(10000); |
|
|
if (jobLogId.HasValue) |
|
|
if (jobLogId.HasValue) |
|
|
{ |
|
|
{ |
|
|
try |
|
|
try |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 执行作业服务方法
|
|
|
timer.Elapsed += (s, e) => JobItemHeartBeat(job.Id); |
|
|
timer.Elapsed += (s, e) => JobItemHeartBeat(job.Id); |
|
|
timer.Start(); |
|
|
timer.Start(); |
|
|
|
|
|
// 通知客户端作业状态变化
|
|
|
scope.ServiceProvider.GetRequiredService<IHubContext<PageHub>>().Clients.All.ServerToClient("JobItem", "refresh", ""); |
|
|
scope.ServiceProvider.GetRequiredService<IHubContext<PageHub>>().Clients.All.ServerToClient("JobItem", "refresh", ""); |
|
|
await jobService.Invoke(scope.ServiceProvider).ConfigureAwait(false); |
|
|
await jobService.Invoke(scope.ServiceProvider).ConfigureAwait(false); |
|
|
this.JobItemSuccess(job.Id, jobLogId.Value); |
|
|
this.JobItemSuccess(job.Id, jobLogId.Value); |
|
@ -152,6 +172,7 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
finally |
|
|
finally |
|
|
{ |
|
|
{ |
|
|
timer.Stop(); |
|
|
timer.Stop(); |
|
|
|
|
|
// 通知客户端作业状态变化
|
|
|
scope.ServiceProvider.GetRequiredService<IHubContext<PageHub>>().Clients.All.ServerToClient("JobItem", "refresh", ""); |
|
|
scope.ServiceProvider.GetRequiredService<IHubContext<PageHub>>().Clients.All.ServerToClient("JobItem", "refresh", ""); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -179,6 +200,7 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
{ |
|
|
{ |
|
|
IsBackground = true |
|
|
IsBackground = true |
|
|
}; |
|
|
}; |
|
|
|
|
|
// 尝试将作业添加到字典中并启动线程
|
|
|
if (this.Jobs.TryAdd(job, new Tuple<CancellationTokenSource, Thread>(source, thread))) |
|
|
if (this.Jobs.TryAdd(job, new Tuple<CancellationTokenSource, Thread>(source, thread))) |
|
|
{ |
|
|
{ |
|
|
thread.Start(); |
|
|
thread.Start(); |
|
@ -193,17 +215,26 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void JobItemFaild(Guid id, Guid jobLogId, Exception ex) |
|
|
private void JobItemFaild(Guid id, Guid jobLogId, Exception ex) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 获取连接字符串
|
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
|
|
|
// 使用连接字符串创建数据库连接
|
|
|
using var connection = new SqlConnection(connectionString); |
|
|
using var connection = new SqlConnection(connectionString); |
|
|
|
|
|
// 配置数据库上下文选项
|
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
|
|
|
// 使用数据库上下文实例化数据库上下文
|
|
|
using var db = new SettleAccountDbContext(options); |
|
|
using var db = new SettleAccountDbContext(options); |
|
|
|
|
|
// 获取指定 id 对应的 JobItem 实体
|
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
if (entity != null) |
|
|
if (entity != null) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 更新实体的 IsRunning 属性,并保存到数据库
|
|
|
entity.IsRunning = false; |
|
|
entity.IsRunning = false; |
|
|
|
|
|
// 获取指定 jobLogId 对应的 JobLog 实体
|
|
|
var log = db.Set<JobLog>().FirstOrDefault(o => o.Id == jobLogId); |
|
|
var log = db.Set<JobLog>().FirstOrDefault(o => o.Id == jobLogId); |
|
|
|
|
|
// 更新 JobLog 实体的 End、Success 和 Exception 属性,并保存到数据库
|
|
|
log.End = DateTime.Now; |
|
|
log.End = DateTime.Now; |
|
|
log.Success = false; |
|
|
log.Success = false; |
|
|
log.Exception = ex.ToString(); |
|
|
log.Exception = ex.ToString(); |
|
@ -211,61 +242,103 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 在指定的作业项成功时更新数据库
|
|
|
private void JobItemSuccess(Guid id, Guid jobLogId) |
|
|
private void JobItemSuccess(Guid id, Guid jobLogId) |
|
|
{ |
|
|
{ |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
using var connection = new SqlConnection(connectionString); |
|
|
using var connection = new SqlConnection(connectionString); |
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
using var db = new SettleAccountDbContext(options); |
|
|
using var db = new SettleAccountDbContext(options); |
|
|
|
|
|
|
|
|
|
|
|
// 通过id查找作业项实体
|
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
if (entity != null) |
|
|
if (entity != null) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 设置作业项为非运行状态
|
|
|
entity.IsRunning = false; |
|
|
entity.IsRunning = false; |
|
|
|
|
|
|
|
|
|
|
|
// 通过jobLogId查找作业日志实体
|
|
|
var log = db.Set<JobLog>().FirstOrDefault(o => o.Id == jobLogId); |
|
|
var log = db.Set<JobLog>().FirstOrDefault(o => o.Id == jobLogId); |
|
|
if (log != null) |
|
|
if (log != null) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 更新作业日志的结束时间和成功状态
|
|
|
log.End = DateTime.Now; |
|
|
log.End = DateTime.Now; |
|
|
log.Success = true; |
|
|
log.Success = true; |
|
|
} |
|
|
} |
|
|
|
|
|
// 保存更改到数据库
|
|
|
db.SaveChanges(); |
|
|
db.SaveChanges(); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 停止作业项的方法
|
|
|
private void JobItemStop(Guid id) |
|
|
private void JobItemStop(Guid id) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 获取数据库连接字符串
|
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
|
|
|
|
|
|
|
|
|
// 使用数据库连接
|
|
|
using var connection = new SqlConnection(connectionString); |
|
|
using var connection = new SqlConnection(connectionString); |
|
|
|
|
|
|
|
|
|
|
|
// 配置数据库上下文选项
|
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
|
|
|
|
|
|
|
|
|
// 使用结算账户数据库上下文
|
|
|
using var db = new SettleAccountDbContext(options); |
|
|
using var db = new SettleAccountDbContext(options); |
|
|
|
|
|
|
|
|
|
|
|
// 从数据库中获取指定id的作业项实体
|
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
if (entity != null) |
|
|
if (entity != null) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 将作业项的运行状态设置为false
|
|
|
entity.IsRunning = false; |
|
|
entity.IsRunning = false; |
|
|
|
|
|
|
|
|
|
|
|
// 获取指定作业id且结束时间为空的作业日志,按开始时间降序排序
|
|
|
var log = db.Set<JobLog>().Where(o => o.JobId == id && o.End == null).OrderByDescending(o => o.Start).FirstOrDefault(); |
|
|
var log = db.Set<JobLog>().Where(o => o.JobId == id && o.End == null).OrderByDescending(o => o.Start).FirstOrDefault(); |
|
|
|
|
|
|
|
|
|
|
|
// 设置作业日志的执行状态为失败,并记录异常描述
|
|
|
log.Success = false; |
|
|
log.Success = false; |
|
|
log.Exception = "心跳超时,自动停止"; |
|
|
log.Exception = "心跳超时,自动停止"; |
|
|
|
|
|
|
|
|
|
|
|
// 保存数据库更改
|
|
|
db.SaveChanges(); |
|
|
db.SaveChanges(); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 通过作业项的ID启动作业项
|
|
|
private Guid? JobItemStart(Guid id) |
|
|
private Guid? JobItemStart(Guid id) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 获取数据库连接字符串
|
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
|
|
|
// 使用连接字符串创建SqlConnection对象
|
|
|
using var connection = new SqlConnection(connectionString); |
|
|
using var connection = new SqlConnection(connectionString); |
|
|
|
|
|
// 使用连接字符串创建SettleAccountDbContext的DbContextOptions
|
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
var options = new DbContextOptionsBuilder<SettleAccountDbContext>().UseSqlServer(connection).Options; |
|
|
|
|
|
// 使用DbContextOptions创建SettleAccountDbContext
|
|
|
using var db = new SettleAccountDbContext(options); |
|
|
using var db = new SettleAccountDbContext(options); |
|
|
|
|
|
// 从数据库中查询指定ID的作业项实体
|
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
var entity = db.Set<JobItem>().FirstOrDefault(o => o.Id == id); |
|
|
|
|
|
// 如果作业项实体存在
|
|
|
if (entity != null) |
|
|
if (entity != null) |
|
|
{ |
|
|
{ |
|
|
|
|
|
// 设置作业项为运行状态,并更新心跳时间为当前时间
|
|
|
entity.IsRunning = true; |
|
|
entity.IsRunning = true; |
|
|
entity.HeartBeat = DateTime.Now; |
|
|
entity.HeartBeat = DateTime.Now; |
|
|
|
|
|
// 添加作业日志
|
|
|
var log = db.Set<JobLog>().Add(new JobLog { Start = DateTime.Now, JobId = entity.Id, Host = Dns.GetHostName() }); |
|
|
var log = db.Set<JobLog>().Add(new JobLog { Start = DateTime.Now, JobId = entity.Id, Host = Dns.GetHostName() }); |
|
|
|
|
|
// 保存作业日志到数据库
|
|
|
db.SaveChanges(); |
|
|
db.SaveChanges(); |
|
|
|
|
|
// 返回新增作业日志的ID
|
|
|
return log.Entity.Id; |
|
|
return log.Entity.Id; |
|
|
} |
|
|
} |
|
|
|
|
|
// 作业项不存在时返回空
|
|
|
return null; |
|
|
return null; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 从数据库中获取指定id的JobItem对象
|
|
|
private JobItem GetJobItem(Guid id) |
|
|
private JobItem GetJobItem(Guid id) |
|
|
{ |
|
|
{ |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
@ -276,6 +349,7 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
return repo.FirstOrDefault(o => o.Id == id); |
|
|
return repo.FirstOrDefault(o => o.Id == id); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 更新指定id的JobItem对象的心跳时间为当前时间
|
|
|
private void JobItemHeartBeat(Guid id) |
|
|
private void JobItemHeartBeat(Guid id) |
|
|
{ |
|
|
{ |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
|
var connectionString = this._serviceProvider.GetRequiredService<IConfiguration>().GetConnectionString("SettleAccountService"); |
|
@ -290,6 +364,7 @@ namespace Win.Sfs.SettleAccount.Entities.BQ |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void RemoveJob(JobItem item) |
|
|
public void RemoveJob(JobItem item) |
|
|
{ |
|
|
{ |
|
|
lock (_lockObj) |
|
|
lock (_lockObj) |
|
|