今天看啥  ›  专栏  ›  BlackSwift

Quartz任务与日志的持久化

BlackSwift  · 简书  ·  · 2017-11-25 23:47

Quartz任务与日志的持久化


定时任务在企业IT与互联网系统中使用非常广泛,一般用于去做耗时的分析、统计、报表、对账等任务. 现实开发中,一般采用Quartz作为Job实现。但是直接使用有如下痛点

  • 断电无法保存
  • 无法实现分布式锁
  • 无法回溯问题

本文将对此进行解决

1. 使用Quartz的Job持久化功能

在分析前,首先要上一个HelloWorld,首先集成Quartz,配置好数据源,并在数据库中刷好DDL,这个网上有很多入门教程

# 配置JobStore的实现类
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate

然后对Job进行调度,查询qrtz_job_details就可以看到具体的内容了。

1.1. 悲观锁分析

当业务需要对某个定时任务进行调度时,将在业务代码中调用StdScheduler#scheduleJob(),通过对其实现类进行分析,它实际上调用的是

org.quartz.core.QuartzScheduler#scheduleJob(org.quartz.JobDetail, org.quartz.Trigger)

这里并没有立即执行任务,而是将Job放入了JobStore中(有点类似于队列)

resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

此时,如果你的代码量阅读够大,有一定的嗅觉,你一定会对resources.getJobStore进行findUsage搜索,经过各种搜索,我们发现了锁(QRTZ_LOCKS)的实现,主要位于StdRowLockSemaphoreJobStoreSupport

下文删除了部分代码以简化板面

protected <T> T executeInNonManagedTXLock(
        String lockName,
        TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
    boolean transOwner = false;
    Connection conn = null;
    try {
        // 这里的`for update`是对当前row的互斥锁,如果没拿到将阻塞等待
        // select * from QRTZ_LOCKS t where t.lock_name=${lockName} for update
        // insert into (SCHED_NAME, LOCK_NAME) VAULE ...
        transOwner = getLockHandler().obtainLock(conn, lockName);
        // 执行更新Trigger/Job等非安全的操作
        final T result = txCallback.execute(conn);
        try {
            // 执行Commit提交事务,释放锁
            commitConnection(conn);
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
        }
        //调用 notifyAll();
        Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
        if (sigTime != null && sigTime >= 0) {
            signalSchedulingChangeImmediately(sigTime);
        }
        return result;
    } catch (JobPersistenceException e) {
        rollbackConnection(conn);
        throw e;
    } finally {
        try {
            // 此部分仅更新ThreadLocal,不涉及DB
            releaseLock(lockName, transOwner);
        } finally {
            cleanupConnection(conn);
        }
    }
}

总的来说,就是利用for update拖着不提交,等干完活再提交来实现的。

1.2. 防止程序同时执行

在Quartz中,通过注解@DisallowConcurrentExecution实现,当Trigger在完成retrieveJob后,将检查Job的注解,如果Job已经在运行,将continue跳过此Job

2. 定制历史记录插件

1. Quartz的Plugin系统

插件系统可以充分利用各种Listener,在Trigger或者Job执行生命周期中实现AOP定制,比如下面就是在Trigger中加日志的插件。在配置文件中加入如下后,当Tigger完成后将自动打上Log。

org.quartz.plugin.his.class=org.quartz.plugins.history.LoggingTriggerHistoryPlugin
org.quartz.plugin.his.triggerFiredMessage=Trigger [{1}.{0}] fired job ...
org.quartz.plugin.his.triggerCompleteMessage=Trigger [{1}.{0}] completed ...
org.quartz.plugin.his.triggerMisfiredMessage=Trigger [{1}.{0}] misfired job ...

插件的实现原理很简单,它没有依赖Spring注入,而是在Factory初始化读取Prop时通过Class.newInstance生成对象实例,并读取prop(比如上面的triggerFiredMessage)调用反射setTriggerFiredMessage()注入属性

// 通过Class.newInstance,并反射`setProp`注入参数
org.quartz.impl.StdSchedulerFactory#instantiate();
// 以上图为例,首先截取掉前面的Group,然后实例化
def listenerClass = "org.quartz.plugins.history.LoggingTriggerHistoryPlugin"
def listener = loadHelper.loadClass(listenerClass).newInstance();
def m = listener.getClass().getMethod("set" + "TriggerFiredMessage", [String.class]);
m.invoke(listener,"Trigger [{1}.{0}] fired job ...")

这里我曾经由于强迫症,尝试使用YAML替代Properties,事实证明这个是需要进行一定技巧的改造,由于直接通过Yaml转过来的是HashMap树形结构,而不是平行的关系,因此你需要设计一个Flattern操作来实现。

2. 历史执行记录的Plugin定制

定制历史记录主要是方便日后有据可查(有锅不背),以减少黑盒问题。下文只提供思路,不提供源码。

2.1. 定制Listener

定制JobListenerSchedulerPlugin接口,并在jobWasExecuted中进行记录操作,此处可以参考LoggingJobHistoryPlugin,并通过context获取Fire相关,Map相关以及Result相关

public interface JobListener {
    //重写如下方法,并写入日志
    void jobWasExecuted(JobExecutionContext context,
            JobExecutionException jobException);
}

除了cron等基本信息外,这里最重要的就是JobExecutionContext.getResult()还有任务中的dataMap,需要结合具体业务进行反序列化,这里也就是为什么网上基本看不到开源实现的原因,因为都是与业务强绑定的。

2.2. 定制DBAppender

写入可以参考LogbackDBAppender的实现方式,不借助ORM框架实现高性能写入日志,拼装与commit操作,由于我删除了Event,因此没有加锁,可以参考这里,伪代码如下

String getInsertSQL(){
    //此部分使用了SEQ,代替了默认的Trigger,只在Oracle下测试通过
    return "INSERT INTO JOB_HIS (ID, ....) VALUES (TASK_SQL.nextval, ?, ?, ?, ?...)";
}

protected appendDBlog(T t){
    Connection connection = null;
    PreparedStatement insertStatement = null;
    try{
        connection = getConnection();
        connection.setAutoCommit(false);
        insertStatement = connection.prepareStatement(getInsertSQL(), new String[] { EVENT_ID_COL_NAME });
        prepareStatement.setString(1,..);
        ....
        connection.commit();
    }catch(Exception e){
        //各种异常
    }finally{
        //关闭DB流
    }
}

此处主要难点在移植DBAppeder,并复用Quartz的DataSource。

2.3. 定制日志DDL

DDL可以参考Logback的SQL,基本上可以复用,但是在Logback中,默认是使用的Trigger,由于这个Trigger权限有时候不好搞到,我们可以用SEQ代替。

当日志快要满时,我们可以手动或者Trigger删除旧的日志

delete from JOB_HIS where FIRE_DATE < sysdate - 90;

总结

目前此日志插件在内部已经正常使用(主要还是并发不大,而且侧重点在可以回溯问题),希望能给踩坑Quartz的读者提供一个思路。

不足

  • 对时间要求极高,需要1s以内



原文地址:访问原文地址
快照地址: 访问文章快照