MSIPO技术圈 首页 IT技术 查看内容

Spring Boot 实现定时任务动态管理

2024-03-29

前言

本文主要介绍了SpringBoot架构下动态定时任务的使用,定时任务表达式配置在数据库中,通过反射执行到目标方法。

Quartz

Quartz 是一个开源的作业调度框架,支持分布式定时任务,Quartz定时任务据我了解可分为Trigger(触发器)Job(任务)和Scheduler(调度器),定时任务的逻辑大体为:创建触发器和任务,并将其加入到调度器中。

Quartz 的核心类有以下三部分:

任务 Job : 需要实现的任务类,实现 execute() 方法,执行后完成任务;
触发器 Trigger : 包括 SimpleTrigger 和 CronTrigger;
调度器 Scheduler : 任务调度器,负责基于 Trigger触发器,来执行 Job任务.

Trigger 有五种触发器:

  • SimpleTrigger 触发器:需要在特定的日期/时间启动,且以指定的间隔时间(单位毫秒)重复执行 n 次任务,如 :在 9:00 开始,每隔1小时,每隔几分钟,每隔几秒钟执行一次 。没办法指定每隔一个月执行一次(每月的时间间隔不是固定值)。

  • CalendarIntervalTrigger 触发器:指定从某一个时间开始,以一定的时间间隔(单位有秒,分钟,小时,天,月,年,星期)执行的任务。

  • DailyTimeIntervalTrigger 触发器:指定每天的某个时间段内,以一定的时间间隔执行任务。并且支持指定星期。如:指定每天 9:00 至 18:00 ,每隔 70 秒执行一次,并且只要周一至周五执行。

  • CronTrigger 触发器:基于日历的任务调度器,即指定星期、日期的某时间执行任务。

  • NthIncludedDayTrigger 触发器:不同时间间隔的第 n 天执行任务。比如,在每个月的第 15 日处理财务发票记帐,同样设定双休日或者假期。

创建任务表

create table sys_job (
  job_id              bigint(20)    not null auto_increment    comment '任务ID',
  job_name            varchar(64)   default ''                 comment '任务名称',
  job_group           varchar(64)   default 'DEFAULT'          comment '任务组名',
  invoke_target       varchar(500)  not null                   comment '调用目标方法',
  cron_expression     varchar(255)  default ''                 comment 'cron执行表达式',
  misfire_policy      varchar(20)   default '3'                comment '计划执行错误策略(1立即执行 2执行一次 3放弃执行)',
  concurrent          char(1)       default '1'                comment '是否并发执行(0允许 1禁止)',
  status              char(1)       default '0'                comment '状态(0正常 1暂停)',
  create_by           varchar(64)   default ''                 comment '创建者',
  create_time         datetime                                 comment '创建时间',
  update_by           varchar(64)   default ''                 comment '更新者',
  update_time         datetime                                 comment '更新时间',
  remark              varchar(500)  default ''                 comment '备注信息',
  primary key (job_id, job_name, job_group)
) engine=innodb auto_increment=100 comment = '定时任务调度表';


INSERT INTO `sys_job`(`job_id`, `job_name`, `job_group`, `invoke_target`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (2, '系统默认(有参)', 'DEFAULT', 'com.demo.task.Task.testParams(\'hello\')', '0/15 * * * * ?', '3', '1', '0', 'admin', '2024-01-16 19:07:33', '', NULL, '');
INSERT INTO `sys_job`(`job_id`, `job_name`, `job_group`, `invoke_target`, `cron_expression`, `misfire_policy`, `concurrent`, `status`, `create_by`, `create_time`, `update_by`, `update_time`, `remark`) VALUES (3, '系统默认(无参)', 'DEFAULT', 'task.testNoParams()', '0/20 * * * * ?', '3', '1', '0', 'admin', '2024-01-16 19:07:33', '', NULL, '');


create table sys_job_log (
  job_log_id          bigint(20)     not null auto_increment    comment '任务日志ID',
  job_name            varchar(64)    not null                   comment '任务名称',
  job_group           varchar(64)    not null                   comment '任务组名',
  invoke_target       varchar(500)   not null                   comment '调用目标字符串',
  job_message         varchar(500)                              comment '日志信息',
  status              char(1)        default '0'                comment '执行状态(0正常 1失败)',
  exception_info      varchar(2000)  default ''                 comment '异常信息',
  create_time         datetime                                  comment '创建时间',
  primary key (job_log_id)
) engine=innodb comment = '定时任务调度日志表';

添加依赖

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
</dependency>

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>4.1.14</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
</dependency>

定义Job

Quartz定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行, 如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。

一般设置都是禁止并发执行

//禁止并发执行
@DisallowConcurrentExecution
public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob {
    @Override
    protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
        JobInvokeUtil.invokeMethod(sysJob);
    }
}


public abstract class AbstractQuartzJob implements Job {
    private static final Logger log = LoggerFactory.getLogger(AbstractQuartzJob.class);

    /**
     * 线程本地变量
     */
    private static ThreadLocal<Date> threadLocal = new ThreadLocal<>();

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        SysJob sysJob = new SysJob();
        BeanUtils.copyProperties(context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES),sysJob);
        try {
            before(context, sysJob);
            if (sysJob != null)
            {
                doExecute(context, sysJob);
            }
            after(context, sysJob, null);
        }
        catch (Exception e)
        {
            log.error("任务执行异常  - :", e);
            after(context, sysJob, e);
        }
    }

    /**
     * 执行前
     *
     * @param context 工作执行上下文对象
     * @param sysJob 系统计划任务
     */
    protected void before(JobExecutionContext context, SysJob sysJob) {
        threadLocal.set(new Date());
    }

    /**
     * 执行后
     *
     * @param context 工作执行上下文对象
     * @param sysJob 系统计划任务
     */
    protected void after(JobExecutionContext context, SysJob sysJob, Exception e){
        Date startTime = threadLocal.get();
        threadLocal.remove();

        // todo 写入数据库当中

    }

    /**
     * 执行方法,由子类重载
     *
     * @param context 工作执行上下文对象
     * @param sysJob 系统计划任务
     * @throws Exception 执行过程中的异常
     */
    protected abstract void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception;
}


实体类

@Data
public class SysJob implements Serializable {
    private static final long serialVersionUID = 1L;

    /** 任务ID */
    private Long jobId;

    /** 任务名称 */
    private String jobName;

    /** 任务组名 */
    private String jobGroup;

    /** 调用目标字符串 */
    private String invokeTarget;

    /** cron执行表达式 */
    private String cronExpression;

    /** cron计划策略 */
    // 0=默认,1=立即触发执行,2=触发一次执行,3=不触发立即执行
    private String misfirePolicy = ScheduleConstants.MISFIRE_DEFAULT;

    /** 是否并发执行(0允许 1禁止) */
    private String concurrent;

    /** 任务状态(0正常 1暂停) */
    private String status;


}

创建定时任务

    public static void createScheduleJob(Scheduler scheduler, SysJob job) throws SchedulerException, TaskException {
        Class<? extends Job> jobClass = getQuartzJobClass(job);
        // 构建job信息
        Long jobId = job.getJobId();
        String jobGroup = job.getJobGroup();
        JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();

        // 表达式调度构建器
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
        cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);

        // 按新的cronExpression表达式构建一个新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup))
                .withSchedule(cronScheduleBuilder).build();

        // 放入参数,运行时的方法可以获取
        jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);

        // 判断是否存在
        if (scheduler.checkExists(getJobKey(jobId, jobGroup))){
            // 防止创建时存在数据问题 先移除,然后在执行创建操作
            scheduler.deleteJob(getJobKey(jobId, jobGroup));
        }

        // 判断任务是否过期
        if (StringUtils.isNotNull(CronUtils.getNextExecution(job.getCronExpression()))){
            // 执行调度任务 核心代码
            scheduler.scheduleJob(jobDetail, trigger);
        }

        // 暂停任务
        if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())){
            scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
        }
    }

    /**
     * 获取quartz任务类
     *
     * @param sysJob 执行计划
     * @return 具体执行任务类
     */
    private static Class<? extends Job> getQuartzJobClass(SysJob sysJob){
        boolean isConcurrent = "0".equals(sysJob.getConcurrent());
        return isConcurrent ? QuartzJobExecution.class : QuartzDisallowConcurrentExecution.class;
    }

反射类

public class JobInvokeUtil {
    /**
     * 执行方法
     *
     * @param sysJob 系统任务
     */
    public static void invokeMethod(SysJob sysJob) throws Exception {
        String invokeTarget = sysJob.getInvokeTarget();
        String beanName = getBeanName(invokeTarget);
        String methodName = getMethodName(invokeTarget);
        List<Object[]> methodParams = getMethodParams(invokeTarget);

        if (!isValidClassName(beanName)) {

            Object bean = SpringUtils.getBean(beanName);
            invokeMethod(bean, methodName, methodParams);
        }
        else{
            Object bean = Class.forName(beanName).getDeclaredConstructor().newInstance();
            invokeMethod(bean, methodName, methodParams);
        }
    }
    
      /**
     * 调用任务方法
     *
     * @param bean 目标对象
     * @param methodName 方法名称
     * @param methodParams 方法参数
     */
    private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams)
            throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
            InvocationTargetException {
        if (StringUtils.isNotNull(methodParams) && methodParams.size() > 0) {
            Method method = bean.getClass().getMethod(methodName, getMethodParamsType(methodParams));
            method.invoke(bean, getMethodParamsValue(methodParams));
        }
        else{
            Method method = bean.getClass().getMethod(methodName);
            method.invoke(bean);
        }
    }
 }

定时任务类

@Component("task")
@Slf4j
public class Task {


    public void testParams(String params) {
        log.info("执行有参方法:" + params);
        System.out.println();
    }

    public void testNoParams() {
        log.info("执行无参方法");
    }
}

初始化定时任务

    @PostConstruct
    public void init() throws SchedulerException, TaskException {
        scheduler.clear();

        List<SysJob> jobList = jobMapper.selectList(null);
        for (SysJob job : jobList) {
            ScheduleUtils.createScheduleJob(scheduler, job);
        }
    }

运行效果:

2024-03-25 14:05:30.020 INFO 11296 — [eduler_Worker-1] com.demo.task.Task : 执行有参方法:hello

2024-03-25 14:05:40.005 INFO 11296 — [eduler_Worker-2] com.demo.task.Task : 执行无参方法
2024-03-25 14:05:45.008 INFO 11296 — [eduler_Worker-3] com.demo.task.Task : 执行有参方法:hello

2024-03-25 14:06:00.012 INFO 11296 — [eduler_Worker-4] com.demo.task.Task : 执行有参方法:hello

2024-03-25 14:06:00.014 INFO 11296 — [eduler_Worker-5] com.demo.task.Task : 执行无参方法

添加定时任务

    public int insertJob(SysJob job) throws SchedulerException, TaskException {
        job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
        int rows = jobMapper.insert(job);
        if (rows > 0) {
            ScheduleUtils.createScheduleJob(scheduler, job);
        }
        return rows;
    }

解决 Quartz Job 中无法注入 Spring Bean

首先自定义一个 JobFactory,通过 AutowireCapableBeanFactory 将创建好的 Job 对象交给 Spring 管理

@Configuration
public class CustomJobFactory extends AdaptableJobFactory {
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    /**
     * Create the job instance, populating it with property values taken
     * from the scheduler context, job data map and trigger data map.
     *
     * @param bundle
     */
    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Object jobInstance = super.createJobInstance(bundle);
        autowireCapableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

再创建一个配置类,将自定义的 JobFactory 设置到 Schedule

@Configuration
public class QuartzConfig {
    @Autowired
    private CustomJobFactory customJobFactory;

    @SneakyThrows
    @Bean
    public Scheduler scheduler(){

相关阅读

热门文章