Quartz.NET作为一个开源的作业调度库,广泛应用于.NET应用程序中,以实现复杂的定时任务,本次记录利用Quartz.NET实现HTTP作业调度,通过自定义HTTP作业,实现对外部API的定时调用和如何管理这些作业,包括创建、修改、暂停、恢复和删除作业。
public class HttpJob : IJob
{
public static readonly Dictionary<string, HttpJobInfo> Delegates = new();
public async Task Execute(IJobExecutionContext context)
{
var delegateKey = context.JobDetail.JobDataMap.GetString("delegateKey");
if (delegateKey != null && Delegates.TryGetValue(delegateKey, out var func))
{
var requestBody = new RestRequest();
if (func.Headers != null)
{
foreach (var header in func.Headers)
{
requestBody.AddHeader(header.Key, header.Value);
}
}
var content = HttpHelper.HttpRequest(func.Url, func.Request, requestBody);
JobLogHelper.AddJobLog(new JobLog() { JobName = context.JobDetail.Key.Name, GroupName = context.JobDetail.Key.Group, RunTime = DateTime.Now, RunResult = content });
UpdateLastExecutionTime(context.JobDetail.Key.Name, context.JobDetail.Key.Group, DateTime.Now);
}
await Task.CompletedTask;
}
}
2.作业信息的持久化:为了持久化作业信息,定义了JobInfo类来存储作业的基本信息,如名称、组名、Cron表达式等,并将这些信息保存在本地的JSON文件中。
public class JobInfo
{
public required string JobName { get; set; }
public required string GroupName { get; set; }
public required string CronExpression { get; set; }
public DateTime LastExecutionTime { get; set; }
public JobStatus Status { get; set; }
public required HttpJobInfo HttpJob { get; set; }
}
3.实现了QuartzHelper类,用于管理作业的生命周期。这包括加载作业信息、创建作业、调度作业、暂停/恢复作业以及删除作业等功能。
public class QuartzHelper
{
private IScheduler scheduler;
private List<JobInfo> jobInfos;
private string filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "jobs.json");
public QuartzHelper()
{
ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
scheduler = schedulerFactory.GetScheduler().Result;
scheduler.Start().Wait();
LoadJobInfosApi().Wait();
}
private void SaveJobInfos()
{
string json = JsonConvert.SerializeObject(jobInfos);
File.WriteAllText(filePath, json);
}
private async Task LoadJobInfosApi()
{
if (File.Exists(filePath))
{
string json = File.ReadAllText(filePath);
jobInfos = JsonConvert.DeserializeObject<List<JobInfo>>(json) ?? new List<JobInfo>();
foreach (var jobInfo in jobInfos)
{
var delegateKey = Guid.NewGuid().ToString();
HttpJob.Delegates[delegateKey] = jobInfo.HttpJob;
IJobDetail job = JobBuilder.Create<HttpJob>()
.WithIdentity(jobInfo.JobName, jobInfo.GroupName).UsingJobData("delegateKey", delegateKey)
.Build();
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(jobInfo.JobName, jobInfo.GroupName)
.WithCronSchedule(jobInfo.CronExpression)
.Build();
await scheduler.ScheduleJob(job, trigger);
if (jobInfo.Status == JobStatus.正常运行)
{
await ResumeJob(jobInfo.JobName, jobInfo.GroupName);
}
else
{
await PauseJob(jobInfo.JobName, jobInfo.GroupName);
}
}
}
else
{
jobInfos = new List<JobInfo>();
}
}
#region 执行普通任务时使用,传委托时可以参考此方法
#endregion
public async Task AddJobApi(string jobName, string groupName, string cronExpression, HttpJobInfo httpJobInfo, string description = "")
{
if (jobInfos.Any(c => c.JobName == jobName && c.GroupName == groupName))
{
return;
}
var delegateKey = Guid.NewGuid().ToString();
HttpJob.Delegates[delegateKey] = httpJobInfo;
var jobInfo = new JobInfo { JobName = jobName, GroupName = groupName, CronExpression = cronExpression, HttpJob = httpJobInfo, Status = JobStatus.正常运行, Description = description, JobCreateTime = DateTime.Now };
jobInfos.Add(jobInfo);
SaveJobInfos();
IJobDetail job = JobBuilder.Create<HttpJob>()
.WithIdentity(jobName, groupName)
.UsingJobData("delegateKey", delegateKey)
.Build();
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(jobName + "Trigger", groupName)
.StartNow()
.WithCronSchedule(cronExpression).WithDescription(description)
.Build();
await scheduler.ScheduleJob(job, trigger);
}
public async Task PauseJob(string jobName, string groupName)
{
await scheduler.PauseJob(new JobKey(jobName, groupName));
var job = jobInfos.FirstOrDefault(j => j.JobName == jobName && j.GroupName == groupName);
if (job != null)
{
job.Status = JobStatus.暂停;
SaveJobInfos();
}
}
public async Task ResumeJob(string jobName, string groupName)
{
await scheduler.ResumeJob(new JobKey(jobName, groupName));
var job = jobInfos.FirstOrDefault(j => j.JobName == jobName && j.GroupName == groupName);
if (job != null)
{
job.Status = JobStatus.正常运行;
SaveJobInfos();
}
}
public async Task TriggerJob(string jobName, string groupName)
{
await scheduler.TriggerJob(new JobKey(jobName, groupName));
var job = jobInfos.FirstOrDefault(j => j.JobName == jobName && j.GroupName == groupName);
if (job != null)
{
job.LastExecutionTime = DateTime.Now;
SaveJobInfos();
}
}
public async Task ModifyJob(string jobName, string groupName, string cronExpression, HttpJobInfo httpJobInfo, string description = "")
{
await DeleteJob(jobName, groupName);
await AddJobApi(jobName, groupName, cronExpression, httpJobInfo, description);
}
public async Task DeleteJob(string jobName, string groupName)
{
await scheduler.DeleteJob(new JobKey(jobName, groupName));
jobInfos.RemoveAll(j => j.JobName == jobName && j.GroupName == groupName);
SaveJobInfos();
}
public List<JobInfo> GetAllJobs()
{
if (File.Exists(filePath))
{
string json = File.ReadAllText(filePath);
jobInfos = JsonConvert.DeserializeObject<List<JobInfo>>(json) ?? new List<JobInfo>();
return jobInfos;
}
else
return null;
}
}
QuartzHelper
4.为了跟踪作业的执行情况,设计了JobLog类和JobLogHelper类,用于记录和查询作业执行日志。
public class JobLogHelper
{
private static string _filePath;
public static List<JobLog> GetJobLog(string jobName, string groupName)
{
_filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"jobsLog-{DateTime.Now:yyyyMMdd}.json");
if (!File.Exists(_filePath))
{
return new List<JobLog>();
}
var jsonText = $"[{File.ReadAllText(_filePath)}]";
var list = JsonConvert.DeserializeObject<List<JobLog>>(jsonText);
if (list != null)
{
var result = list.Where(c => c.JobName == jobName && groupName == c.GroupName).OrderByDescending(c => c.RunTime).ToList();
return result;
}
return null;
}
public static List<JobLog> GetAllLogs()
{
List<JobLog> jobLogs = new List<JobLog>();
var logFilePaths = Directory.GetFiles(AppDomain.CurrentDomain.BaseDirectory, "jobsLog-*.json");
logFilePaths.ToList().ForEach(c =>
{
var jsonText = $"[{File.ReadAllText(_filePath)}]";
var list = JsonConvert.DeserializeObject<List<JobLog>>(jsonText);
if (list != null) jobLogs.AddRange(list);
});
return jobLogs;
}
public static void AddJobLog(JobLog jobLog)
{
_filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"jobsLog-{DateTime.Now:yyyyMMdd}.json");
string json = JsonConvert.SerializeObject(jobLog) + ",\n";
File.AppendAllText(_filePath, json);
}
}
作业执行日志
5.最后,通过ASP.NET Core的Controller提供了一系列Web API接口,以便于通过HTTP请求管理作业。这些接口包括获取作业列表、添加作业、修改作业、删除作业、暂停作业、恢复作业和立即执行作业等。
[Route("api/[controller]")]
[ApiController]
public class QuartzController : ControllerBase
{
private readonly QuartzHelper _quartzHelper;
public QuartzController(QuartzHelper quartzHelper)
{
_quartzHelper = quartzHelper;
}
[HttpGet]
[Route("job/GetJobs")]
public object GetJobs()
{
return Ok(new {code=200,data = _quartzHelper.GetAllJobs() });
}
[HttpGet]
[Route("job/GetJobLog")]
public object GetJobLog(string jobName, string groupName)
{
return Ok(new { code = 200, data = JobLogHelper.GetJobLog(jobName, groupName) });
}
[HttpGet]
[Route("job/GetJobLogs")]
public object GetJobLogs()
{
return Ok(new { code = 200, data = JobLogHelper.GetAllLogs() });
}
[HttpPost]
[Route("job/AddJob")]
public async Task<object> Add(JobInfo jobInfo)
{
try
{
await _quartzHelper.AddJobApi(jobInfo.JobName, jobInfo.GroupName, jobInfo.CronExpression, jobInfo.HttpJob, jobInfo.Description);
return Ok(new { code = 200, msg = "创建成功!" });
}
catch (Exception ex)
{
return Ok(new { code = 500, msg = ex.Message });
}
}
[HttpPost]
[Route("job/ModifyJob")]
public async Task<object> Edit(JobInfo jobInfo)
{
try
{
await _quartzHelper.ModifyJob(jobInfo.JobName, jobInfo.GroupName, jobInfo.CronExpression, jobInfo.HttpJob, jobInfo.Description);
return Ok(new { code = 200, msg = "修改成功!" });
}
catch (Exception ex)
{
return Ok(new { code = 500, msg = ex.Message });
}
}
[HttpGet]
[Route("job/DeleteJob")]
public async Task<object> Delete(string jobName, string groupName)
{
try
{
await _quartzHelper.DeleteJob(jobName, groupName);
return Ok(new { code = 200, msg = "删除成功!" });
}
catch (Exception ex)
{
return Ok(new { code = 500, msg = ex.Message });
}
}
[HttpGet]
[Route("job/PauseJob")]
public async Task<object> PauseJob(string jobName, string groupName)
{
try
{
await _quartzHelper.PauseJob(jobName, groupName);
return Ok(new { code = 200, msg = "暂停成功!" });
}
catch (Exception ex)
{
return Ok(new { code = 500, msg = ex.Message });
}
}
[HttpGet]
[Route("job/ResumeJob")]
public async Task<object> ResumeJob(string jobName, string groupName)
{
try
{
await _quartzHelper.ResumeJob(jobName, groupName);
return Ok(new { code = 200, msg = "开启任务成功!" });
}
catch (Exception ex)
{
return Ok(new { code = 500, msg = ex.Message });
}
}
[HttpGet]
[Route("job/TriggerJob")]
public async Task<object> TriggerJob(string jobName, string groupName)
{
try
{
await _quartzHelper.TriggerJob(jobName, groupName);
return Ok(new { code = 200, msg = "立即执行任务命令已执行!" });
}
catch (Exception ex)
{
return Ok(new { code = 500, msg = ex.Message });
}
}
}
Web API接口
源码地址:https://github.com/yycb1994/Quartz.Net
转自:https://www.cnblogs.com/INetIMVC/p/18281699
该文章在 2024/9/7 11:10:58 编辑过