Quartz任务与日志的持久化
定时任务在企业IT与互联网系统中使用非常广泛,一般用于去做耗时的分析、统计、报表、对账等任务. 现实开发中,一般采用Quartz作为Job实现。但是直接使用有如下痛点
本文将对此进行解决
1. 使用Quartz的Job持久化功能
在分析前,首先要上一个HelloWorld,首先集成Quartz,配置好数据源,并在数据库中刷好DDL,这个网上有很多入门教程
# 配置JobStore的实现类
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
然后对Job进行调度,查询qrtz_job_details
就可以看到具体的内容了。
1.1. 悲观锁分析
当业务需要对某个定时任务进行调度时,将在业务代码中调用StdScheduler#scheduleJob()
,通过对其实现类进行分析,它实际上调用的是
org.quartz.core.QuartzScheduler#scheduleJob(org.quartz.JobDetail, org.quartz.Trigger)
这里并没有立即执行任务,而是将Job放入了JobStore中(有点类似于队列)
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
此时,如果你的代码量阅读够大,有一定的嗅觉,你一定会对resources.getJobStore
进行findUsage搜索,经过各种搜索,我们发现了锁(QRTZ_LOCKS)的实现,主要位于StdRowLockSemaphore
与JobStoreSupport
下文删除了部分代码以简化板面
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
// 这里的`for update`是对当前row的互斥锁,如果没拿到将阻塞等待
// select * from QRTZ_LOCKS t where t.lock_name=${lockName} for update
// insert into (SCHED_NAME, LOCK_NAME) VAULE ...
transOwner = getLockHandler().obtainLock(conn, lockName);
// 执行更新Trigger/Job等非安全的操作
final T result = txCallback.execute(conn);
try {
// 执行Commit提交事务,释放锁
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
}
//调用 notifyAll();
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if (sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} finally {
try {
// 此部分仅更新ThreadLocal,不涉及DB
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}
总的来说,就是利用for update
拖着不提交,等干完活再提交来实现的。
1.2. 防止程序同时执行
在Quartz中,通过注解@DisallowConcurrentExecution
实现,当Trigger在完成retrieveJob
后,将检查Job的注解,如果Job已经在运行,将continue跳过此Job
2. 定制历史记录插件
1. Quartz的Plugin系统
插件系统可以充分利用各种Listener,在Trigger或者Job执行生命周期中实现AOP定制,比如下面就是在Trigger中加日志的插件。在配置文件中加入如下后,当Tigger完成后将自动打上Log。
org.quartz.plugin.his.class=org.quartz.plugins.history.LoggingTriggerHistoryPlugin
org.quartz.plugin.his.triggerFiredMessage=Trigger [{1}.{0}] fired job ...
org.quartz.plugin.his.triggerCompleteMessage=Trigger [{1}.{0}] completed ...
org.quartz.plugin.his.triggerMisfiredMessage=Trigger [{1}.{0}] misfired job ...
插件的实现原理很简单,它没有依赖Spring注入,而是在Factory初始化读取Prop时通过Class.newInstance生成对象实例,并读取prop(比如上面的triggerFiredMessage
)调用反射setTriggerFiredMessage()
注入属性
// 通过Class.newInstance,并反射`setProp`注入参数
org.quartz.impl.StdSchedulerFactory#instantiate();
// 以上图为例,首先截取掉前面的Group,然后实例化
def listenerClass = "org.quartz.plugins.history.LoggingTriggerHistoryPlugin"
def listener = loadHelper.loadClass(listenerClass).newInstance();
def m = listener.getClass().getMethod("set" + "TriggerFiredMessage", [String.class]);
m.invoke(listener,"Trigger [{1}.{0}] fired job ...")
这里我曾经由于强迫症,尝试使用YAML替代Properties,事实证明这个是需要进行一定技巧的改造,由于直接通过Yaml转过来的是HashMap树形结构,而不是平行的关系,因此你需要设计一个Flattern操作来实现。
2. 历史执行记录的Plugin定制
定制历史记录主要是方便日后有据可查(有锅不背),以减少黑盒问题。下文只提供思路,不提供源码。
2.1. 定制Listener
定制JobListener
与SchedulerPlugin
接口,并在jobWasExecuted
中进行记录操作,此处可以参考LoggingJobHistoryPlugin
,并通过context
获取Fire相关,Map相关以及Result相关
public interface JobListener {
//重写如下方法,并写入日志
void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException);
}
除了cron等基本信息外,这里最重要的就是JobExecutionContext.getResult()
还有任务中的dataMap
,需要结合具体业务进行反序列化,这里也就是为什么网上基本看不到开源实现的原因,因为都是与业务强绑定的。
2.2. 定制DBAppender
写入可以参考Logback中DBAppender
的实现方式,不借助ORM框架实现高性能写入日志,拼装与commit操作,由于我删除了Event,因此没有加锁,可以参考这里,伪代码如下
String getInsertSQL(){
//此部分使用了SEQ,代替了默认的Trigger,只在Oracle下测试通过
return "INSERT INTO JOB_HIS (ID, ....) VALUES (TASK_SQL.nextval, ?, ?, ?, ?...)";
}
protected appendDBlog(T t){
Connection connection = null;
PreparedStatement insertStatement = null;
try{
connection = getConnection();
connection.setAutoCommit(false);
insertStatement = connection.prepareStatement(getInsertSQL(), new String[] { EVENT_ID_COL_NAME });
prepareStatement.setString(1,..);
....
connection.commit();
}catch(Exception e){
//各种异常
}finally{
//关闭DB流
}
}
此处主要难点在移植DBAppeder,并复用Quartz的DataSource。
2.3. 定制日志DDL
DDL可以参考Logback的SQL,基本上可以复用,但是在Logback中,默认是使用的Trigger,由于这个Trigger权限有时候不好搞到,我们可以用SEQ代替。
当日志快要满时,我们可以手动或者Trigger删除旧的日志
delete from JOB_HIS where FIRE_DATE < sysdate - 90;
总结
目前此日志插件在内部已经正常使用(主要还是并发不大,而且侧重点在可以回溯问题),希望能给踩坑Quartz的读者提供一个思路。
不足