文章
问答
冒泡
基于Quartz实现动态调度任务

前言

市面上有很多很强大的任务调度框架如xxl-job、powerJob等,都是可以单独部署且支持分布式,但是对于一般的小型系统来说,这些分布式调度框架的功能过于强大,很多时候系统只需要简单的可动态配置的定时任务功能即可,这个时候使用quartz就比较合适

功能分析

quartz核心功能类主要有job(任务接口)、scheduler(任务调度器)、jobDetail(任务实例对象)以及Trigger(触发器),要实现动态配置其实就是通过读取数据库里任务执行的周期和任务执行的方法,通过反射获取具体执行方法,然后让scheduler去调度即可

第一步,在springboot项目中引入quartz依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

第二步,定义数据库实体类(SysJob和SysJobLog)

public class SysJob extends BaseLogicEntity {
	@TableId(type = IdType.ASSIGN_ID)
	private String id;
	private String jobName;
	/**
	* Bean调用示例:需要添加对应Bean注解@Component或@Service。调用目标字符串:testTask.test()
	* Class类调用示例:添加类和方法指定包即可。调用目标字符串:com.xxx.quartz.task.testTask.test()
	*/
	private String invokeTarget;
	/**
	* cron表达式:
	*/
	private String cronExpression;
	/**
	* 执行策略详解:
	* 立即执行(所有misfire的任务会马上执行)打个比方,如果9点misfire了,在10:15系统恢复之后,9点,10点的misfire会马上执行
	* 执行一次(会合并部分的misfire,正常执行下一个周期的任务)假设9,10的任务都misfire了,系统在10:15分起来了。只会执行一次misfire,下次正点执行。
	* 放弃执行(所有的misfire不管,执行下一个周期的任务)
	*/
	private String misfirePolicy = ScheduleConstants.MISFIRE_DEFAULT;
	private String status;
	private String remark;
}
public class SysJobLog extends BaseLogicEntity {
    @TableId(type = IdType.ASSIGN_ID)
    private String id;
    private String jobName;
    private String invokeTarget;
    private String jobMessage;
    private String status;
    private String exceptionInfo;
    private Instant startTime;
    private Instant stopTime;
}

第三步,实现Job接口(QuartzJob)

@Slf4j
public class QuartzJob implements Job {
    /**
     * 线程本地变量
     */
    private static ThreadLocal<Instant> threadLocal = new ThreadLocal<>();

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        SysJob sysJob = OrikaUtils.getMapperFacade().map(context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES), SysJob.class);
        try {
            before(context, sysJob);
            if (sysJob != null) {
            	// 通过反射调用方法
				JobInvokeUtils.invokeMethod(sysJob);
            }
            after(context, sysJob, null);
        } catch (Exception e) {
            log.error("任务执行异常  - :", e);
            after(context, sysJob, e);
        }
    }

    /**
     * 执行前
     *
     * @param context 工作执行上下文对象
     * @param sysJob  系统计划任务
     */
    protected void before(JobExecutionContext context, SysJob sysJob) {
        threadLocal.set(Instant.now());
    }

    /**
     * 执行后
     *
     * @param context 工作执行上下文对象
     * @param sysJob  系统计划任务
     */
    protected void after(JobExecutionContext context, SysJob sysJob, Exception e) {
        Instant startTime = threadLocal.get();
        threadLocal.remove();

        final SysJobLog sysJobLog = new SysJobLog();
        sysJobLog.setJobName(sysJob.getJobName());
        sysJobLog.setInvokeTarget(sysJob.getInvokeTarget());
        sysJobLog.setStartTime(startTime);
        sysJobLog.setStopTime(Instant.now());
        long runMs = ChronoUnit.SECONDS.between(sysJobLog.getStopTime(), sysJobLog.getStartTime());
        sysJobLog.setJobMessage(sysJobLog.getJobName() + " 总共耗时:" + runMs + "毫秒");
        if (e != null) {
            sysJobLog.setStatus(ScheduleConstants.FAIL);
            String errorMsg = StringUtils.substring(e.getMessage(), 0, 2000);
            sysJobLog.setExceptionInfo(errorMsg);
        } else {
            sysJobLog.setStatus(ScheduleConstants.SUCCESS);
        }

        // 写入数据库当中
        SpringUtils.getBean(SysJobLogService.class).addJobLog(sysJobLog);
    }

}

第四步,编写工具类(任务调度工具类、反射工具类、Cron表达式工具类)

/**
 * 定时任务工具类
 */
public class ScheduleUtils {

    /**
     * 构建任务触发对象
     */
    public static TriggerKey getTriggerKey(String jobId) {
        return TriggerKey.triggerKey(ScheduleConstants.TASK_CLASS_NAME + jobId);
    }

    /**
     * 构建任务键对象
     */
    public static JobKey getJobKey(String jobId) {
        return JobKey.jobKey(ScheduleConstants.TASK_CLASS_NAME + jobId);
    }

    /**
     * 创建定时任务
     */
    public static void createScheduleJob(Scheduler scheduler, SysJob job) throws SchedulerException {
        // 构建job信息
        String jobId = job.getId();
        JobDetail jobDetail = JobBuilder.newJob(QuartzJob.class).withIdentity(getJobKey(jobId)).build();

        // 表达式调度构建器
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
        cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);

        // 按新的cronExpression表达式构建一个新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId))
                .withSchedule(cronScheduleBuilder).build();

        // 放入参数,运行时的方法可以获取
        jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);

        // 判断是否存在
        if (scheduler.checkExists(getJobKey(jobId))) {
            // 防止创建时存在数据问题 先移除,然后在执行创建操作
            scheduler.deleteJob(getJobKey(jobId));
        }

        // 判断任务是否过期
        if (ObjectUtils.isNotEmpty(CronUtils.getNextExecution(job.getCronExpression()))) {
            // 执行调度任务
            scheduler.scheduleJob(jobDetail, trigger);
        }

        // 暂停任务
        if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) {
            scheduler.pauseJob(ScheduleUtils.getJobKey(jobId));
        }
    }

    /**
     * 设置定时任务策略
     */
    public static CronScheduleBuilder handleCronScheduleMisfirePolicy(SysJob job, CronScheduleBuilder cb) throws SchedulerException {
        switch (job.getMisfirePolicy()) {
            case ScheduleConstants.MISFIRE_DEFAULT:
                return cb;
            case ScheduleConstants.MISFIRE_IGNORE_MISFIRES:
                return cb.withMisfireHandlingInstructionIgnoreMisfires();
            case ScheduleConstants.MISFIRE_FIRE_AND_PROCEED:
                return cb.withMisfireHandlingInstructionFireAndProceed();
            case ScheduleConstants.MISFIRE_DO_NOTHING:
                return cb.withMisfireHandlingInstructionDoNothing();
            default:
                throw new SchedulerException("The task misfire policy '" + job.getMisfirePolicy()
                        + "' cannot be used in cron schedule tasks");
        }
    }
}
/**
 * 任务执行工具
 */
public class JobInvokeUtils {
    /**
     * 执行方法
     *
     * @param sysJob 系统任务
     */
    public static void invokeMethod(SysJob sysJob) throws Exception {
        String invokeTarget = sysJob.getInvokeTarget();
        String beanName = getBeanName(invokeTarget);
        String methodName = getMethodName(invokeTarget);
        List<Object[]> methodParams = getMethodParams(invokeTarget);

        if (!isValidClassName(beanName)) {
            Object bean = SpringUtils.getBean(beanName);
            invokeMethod(bean, methodName, methodParams);
        } else {
            Object bean = Class.forName(beanName).newInstance();
            invokeMethod(bean, methodName, methodParams);
        }
    }

    /**
     * 调用任务方法
     *
     * @param bean         目标对象
     * @param methodName   方法名称
     * @param methodParams 方法参数
     */
    private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams)
            throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
            InvocationTargetException {
        if (CollectionUtils.isNotEmpty(methodParams) && methodParams.size() > 0) {
            Method method = bean.getClass().getMethod(methodName, getMethodParamsType(methodParams));
            method.invoke(bean, getMethodParamsValue(methodParams));
        } else {
            Method method = bean.getClass().getMethod(methodName);
            method.invoke(bean);
        }
    }

    /**
     * 校验是否为为class包名
     *
     * @param invokeTarget 名称
     * @return true是 false否
     */
    public static boolean isValidClassName(String invokeTarget) {
        return StringUtils.countMatches(invokeTarget, ".") > 1;
    }

    /**
     * 获取bean名称
     *
     * @param invokeTarget 目标字符串
     * @return bean名称
     */
    public static String getBeanName(String invokeTarget) {
        String beanName = StringUtils.substringBefore(invokeTarget, "(");
        return StringUtils.substringBeforeLast(beanName, ".");
    }

    /**
     * 获取bean方法
     *
     * @param invokeTarget 目标字符串
     * @return method方法
     */
    public static String getMethodName(String invokeTarget) {
        String methodName = StringUtils.substringBefore(invokeTarget, "(");
        return StringUtils.substringAfterLast(methodName, ".");
    }

    /**
     * 获取method方法参数相关列表
     *
     * @param invokeTarget 目标字符串
     * @return method方法相关参数列表
     */
    public static List<Object[]> getMethodParams(String invokeTarget) {
        String methodStr = StringUtils.substringBetween(invokeTarget, "(", ")");
        if (StringUtils.isEmpty(methodStr)) {
            return null;
        }
        String[] methodParams = methodStr.split(",(?=([^\"']*[\"'][^\"']*[\"'])*[^\"']*$)");
        List<Object[]> classs = new LinkedList<>();
        for (int i = 0; i < methodParams.length; i++) {
            String str = StringUtils.trimToEmpty(methodParams[i]);
            // String字符串类型,以'或"开头
            if (StringUtils.startsWithAny(str, "'", "\"")) {
                classs.add(new Object[]{StringUtils.substring(str, 1, str.length() - 1), String.class});
            }
            // boolean布尔类型,等于true或者false
            else if ("true".equalsIgnoreCase(str) || "false".equalsIgnoreCase(str)) {
                classs.add(new Object[]{Boolean.valueOf(str), Boolean.class});
            }
            // long长整形,以L结尾
            else if (StringUtils.endsWith(str, "L")) {
                classs.add(new Object[]{Long.valueOf(StringUtils.substring(str, 0, str.length() - 1)), Long.class});
            }
            // double浮点类型,以D结尾
            else if (StringUtils.endsWith(str, "D")) {
                classs.add(new Object[]{Double.valueOf(StringUtils.substring(str, 0, str.length() - 1)), Double.class});
            }
            // 其他类型归类为整形
            else {
                classs.add(new Object[]{Integer.valueOf(str), Integer.class});
            }
        }
        return classs;
    }

    /**
     * 获取参数类型
     *
     * @param methodParams 参数相关列表
     * @return 参数类型列表
     */
    public static Class<?>[] getMethodParamsType(List<Object[]> methodParams) {
        Class<?>[] classs = new Class<?>[methodParams.size()];
        int index = 0;
        for (Object[] os : methodParams) {
            classs[index] = (Class<?>) os[1];
            index++;
        }
        return classs;
    }

    /**
     * 获取参数值
     *
     * @param methodParams 参数相关列表
     * @return 参数值列表
     */
    public static Object[] getMethodParamsValue(List<Object[]> methodParams) {
        Object[] classs = new Object[methodParams.size()];
        int index = 0;
        for (Object[] os : methodParams) {
            classs[index] = (Object) os[0];
            index++;
        }
        return classs;
    }
}
/**
 * cron表达式工具类
 */
public class CronUtils {

    /**
     * 返回一个布尔值代表一个给定的Cron表达式的有效性
     *
     * @param cronExpression Cron表达式
     * @return boolean 表达式是否有效
     */
    public static boolean isValid(String cronExpression) {
        return CronExpression.isValidExpression(cronExpression);
    }

    /**
     * 返回一个字符串值,表示该消息无效Cron表达式给出有效性
     *
     * @param cronExpression Cron表达式
     * @return String 无效时返回表达式错误描述,如果有效返回null
     */
    public static String getInvalidMessage(String cronExpression) {
        try {
            new CronExpression(cronExpression);
            return null;
        } catch (ParseException pe) {
            return pe.getMessage();
        }
    }

    /**
     * 返回下一个执行时间根据给定的Cron表达式
     *
     * @param cronExpression Cron表达式
     * @return Date 下次Cron表达式执行时间
     */
    public static Date getNextExecution(String cronExpression) {
        try {
            CronExpression cron = new CronExpression(cronExpression);
            return cron.getNextValidTimeAfter(new Date(System.currentTimeMillis()));
        } catch (ParseException e) {
            throw new IllegalArgumentException(e.getMessage());
        }
    }
}

第五步,编写Service层逻辑(CRUD)

@Service
@RequiredArgsConstructor
public class SysJobService {
    private final SysJobManager sysJobManager;
    private final Scheduler scheduler;

    @PostConstruct
    public void init() throws SchedulerException {
        scheduler.clear();
        List<SysJob> jobList = sysJobManager.selectAll();
        for (SysJob job : jobList) {
            ScheduleUtils.createScheduleJob(scheduler, job);
        }
    }

    public MSPageInfo<SysJob> page(SysJobCriteria criteria) {
        return sysJobManager.page(criteria);
    }

    public SysJob selectById(String id) {
        return sysJobManager.selectById(id);
    }

    public boolean pauseJob(SysJob job) throws SchedulerException {
        String jobId = job.getId();
        job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
        boolean flag = sysJobManager.updateById(job);
        if (flag) {
            scheduler.pauseJob(ScheduleUtils.getJobKey(jobId));
        }
        return flag;
    }

    public boolean resumeJob(SysJob job) throws SchedulerException {
        String jobId = job.getId();
        job.setStatus(ScheduleConstants.Status.NORMAL.getValue());
        boolean flag = sysJobManager.updateById(job);
        if (flag) {
            scheduler.resumeJob(ScheduleUtils.getJobKey(jobId));
        }
        return flag;
    }

    public boolean deleteJob(SysJob job) throws SchedulerException {
        String jobId = job.getId();
        boolean flag = sysJobManager.deleteById(jobId);
        if (flag) {
            scheduler.deleteJob(ScheduleUtils.getJobKey(jobId));
        }
        return flag;
    }

    public void deleteById(String id) throws SchedulerException {
        SysJob job = selectById(id);
        deleteJob(job);
    }

    public boolean changeStatus(SysJob job) throws SchedulerException {
        boolean flag = false;
        String status = job.getStatus();
        if (ScheduleConstants.Status.NORMAL.getValue().equals(status)) {
            flag = resumeJob(job);
        } else if (ScheduleConstants.Status.PAUSE.getValue().equals(status)) {
            flag = pauseJob(job);
        }
        return flag;
    }

    public void run(SysJob job) throws SchedulerException {
        String jobId = job.getId();
        SysJob properties = selectById(job.getId());
        // 参数
        JobDataMap dataMap = new JobDataMap();
        dataMap.put(ScheduleConstants.TASK_PROPERTIES, properties);
        JobKey jobKey = ScheduleUtils.getJobKey(jobId);
        if (scheduler.checkExists(jobKey)) {
            scheduler.triggerJob(jobKey, dataMap);
        } else {
            throw new MSException("任务不存在或已过期!");
        }
    }

    public boolean insertJob(SysJob job) throws SchedulerException {
        validationJob(job);
        job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
        boolean flag = sysJobManager.save(job);
        if (flag) {
            ScheduleUtils.createScheduleJob(scheduler, job);
        }
        return flag;
    }

    public boolean updateJob(SysJob job) throws SchedulerException {
        validationJob(job);
        boolean flag = sysJobManager.updateById(job);
        if (flag) {
            updateSchedulerJob(job);
        }
        return flag;
    }

    public void updateSchedulerJob(SysJob job) throws SchedulerException {
        String jobId = job.getId();
        // 判断是否存在
        JobKey jobKey = ScheduleUtils.getJobKey(jobId);
        if (scheduler.checkExists(jobKey)) {
            // 防止创建时存在数据问题 先移除,然后在执行创建操作
            scheduler.deleteJob(jobKey);
        }
        ScheduleUtils.createScheduleJob(scheduler, job);
    }

    public boolean checkCronExpressionIsValid(String cronExpression) {
        return CronUtils.isValid(cronExpression);
    }

}

第六步,编写Controller层

@RequiredArgsConstructor
@RestController
@RequestMapping(value = JOB_URI + "jobs")
public class SysJobController {

    private final SysJobService sysJobService;
    private final MapperFacade mapperFacade;

    @GetMapping(value = "page")
    public MSPageInfo<SysJob> page(SysJobQuery query) {
        SysJobCriteria criteria = mapperFacade.map(query, SysJobCriteria.class);
        return sysJobService.page(criteria);
    }

    @GetMapping(value = "{id}")
    public SysJob queryById(@PathVariable String id) {
        return sysJobService.selectById(id);
    }

    @PostMapping
    public void add(@RequestBody SysJobRO ro) throws SchedulerException {
        SysJob entity = mapperFacade.map(ro, SysJob.class);
        sysJobService.insertJob(entity);
    }

    @PutMapping
    public void edit(@RequestBody SysJobRO ro) throws SchedulerException {
        SysJob entity = mapperFacade.map(ro, SysJob.class);
        sysJobService.updateJob(entity);
    }

    @PutMapping("/changeStatus")
    public void changeStatus(@RequestBody SysJobRO ro) throws SchedulerException {
        SysJob job = mapperFacade.map(ro, SysJob.class);
        SysJob newJob = sysJobService.selectById(job.getId());
        newJob.setStatus(job.getStatus());
        sysJobService.changeStatus(newJob);
    }

    @PutMapping("/run")
    public void run(@RequestBody SysJobRO ro) throws SchedulerException {
        SysJob job = mapperFacade.map(ro, SysJob.class);
        sysJobService.run(job);
    }

    @DeleteMapping("/{id}")
    public void remove(@PathVariable String id) throws SchedulerException {
        sysJobService.deleteById(id);
    }

}
spring boot

关于作者

TimothyC
天不造人上之人,亦不造人下之人
获得点赞
文章被阅读