任务调度框架 JobRunr

2024-06-16, 星期日, 13:34

培训

业务系统中常常会有一些任务调度的需求:

  • 新用户注册成功后需要发送欢迎邮件
  • 在计划的课程即将开始时向学员发送通知

第一个场景中,「发送欢迎邮件」这个动作应该是异步的,换句话说,「发送欢迎邮件」和「返回注册成功的消息」并没有先后或因果的关系,前者不应该阻塞后者。

Java 中处理异步任务有多种方法,例如 new 一个 Thread(或是使用线程池),Spring 框架中的 ApplicationEventPublisher,万能的消息队列、缓存系统,甚至是内存中的数据结构实现。

第二个场景中,最容易想到的做法是创建定期执行的任务,在这个任务中遍历所有课程的开始时间,判断是否有必要向学员发送通知,严谨一些的做法还会加上「是否已经就该课程发送过通知」之类的判断。另一种做法则是在录入这些课程的开课时间后创建对应的提醒任务,之后定期访问任务队列消耗它们。想要更高效一些的话,我们可以将这些任务按照触发时间排个序。

任务调度的需求当然也能用抽象程度更高的任务调度框架解决。JobRunr 是一款功能和 Quartz 差不多的 Java / .NET 任务调度框架。对笔者而言,Quartz 不是很熟所以没什么历史包袱,而 JobRunr 又老在 InfoQ 的《Java 近期新闻》栏目中刷脸。再加上 How to move from Quartz to JobRunr 的广告:

  • 简单且现代化的 API
  • 云原生
  • 支持多种 SQL 和 NoSQL 数据库
  • 内置仪表盘
  • 总之就是非常可靠
  • 灵活且可扩展的架构
  • 活跃的开发者社区

Well, what can i say? ¯\(ツ)

在 Spring Boot 项目中使用

参考 Configuration - Spring Boot Starter 根据自己使用 Spring Boot 2 还是 3 引入  org.jobrunr:jobrunr-spring-boot-2-starter 或 org.jobrunr:jobrunr-spring-boot-3-starter

按文档的说法,「If you only want to schedule jobs, you don’t need to do anything」,不过笔者没明白怎样规划的任务才不是 background jobs,所以你需要在 application.properties 中添加 org.jobrunr.background-job-server.enabled=true,或者在 YAML 中:

org:
  jobrunr:
    background-job-server:
      enabled: true

如此,JobRunr 会在 IoC 容器中搜寻 DataSource 类型的 Bean 并创建一些表和视图来持久化数据,以 MySQL 为例:

# tables
jobrunr_backgroundjobservers
jobrunr_jobs
jobrunr_metadata
jobrunr_migrations
jobrunr_recurring_jobs
# views
jobrunr_jobs_stats

还有一些高级配置,例如开启 Web 仪表盘、改变 15 秒查询一次调度列表的行为等,可以参考 # Advanced Configuration 一节。

如果你通过 org.jobrunr.dashboard.enabled=true 开启了仪表盘,可以访问 http://localhost:8000 查看调度任务,也可以通过仪表盘取消、删除或提前执行任务,因此需注意不能对外暴露访问途径。

后台任务、定时任务和周期性任务

可以使用 org.jobrunr.scheduling.BackgroundJob 提供的一系列 Lambda 风格的静态方法添加需要调度的任务。

本文暂且将那些由当前动作产生不应该阻塞当前动作的任务称为「后台任务」,以表现应当在后台执行之意(可能会和 background job 混淆,而且调度的任务哪一个不是在后台执行的,所以各位读者会意即可)。

以「用户注册后发送欢迎邮件」这个场景举例,任务会进入任务队列,并在合适的时候执行。

JobId jobId = BackgroundJob.enqueue(
  () -> publicRelationService.welcomeOnBoard(userId));

你可以在添加任务的时候传入参数,参数得是基本类型,或任何能够通过 Jackson / Gson 等 JSON 工具序列化和反序列化的类。

方法会返回 org.jobrunr.jobs.JobId 类型的任务标识符,该类型实际上是对 java.util.UUID 的封装,因此二者之间也可以很容易地互相转换。

publicRelationService 是一个注册到 IoC 中的 Bean,在 Spring Boot 中,最简单的方式就是为类加上 @Service@Component 注解。

至于定时任务,以「在课程开始前提醒学员」为例,确定课程开始日期后,设定开课前 3 天向已报名学员发送提醒消息:

JobId jobId = BackgroundJob.schedule(
  courseStartDate.minusDays(3),
  () -> courseService.sendReminder(courseId, courseStartDate));

为什么说 JobRunr 很现代呢?因为 schedule 方法接受 java.time 时间类型,Hurrah! 🎉🎉🎉

回到任务调度的基础,创建定期执行某任务的计划:

String jobId = BackgroundJob.scheduleRecurrently(
  Cron.weekly(DayOfWeek.MONDAY),
  () -> publicRelationService.sendWeeklyNewsletter());

这段代码会设置每周一 00:00 发送新闻邮件的任务。第一个参数可以直接传入字符串格式的 cron 表达式,注意这里应当使用 Unix 风格的表达式(参考 crontab guru),而不要使用 Spring 的 7 段表达式。或者考虑到代码的可读性,也可以像示例中这样使用 org.jobrunr.scheduling.cron.Cron 的方法构造。

你可以看到方法返回的 jobId 变成了字符串类型,这就是任务模版和任务实例的区别(当成类定义和实例就行)。Jobrunr 使用 Lambda 表达式中传递的方法名称作为任务(模版)的 ID。所以执行如下代码并不会创建三个定时任务,因为它们的任务(模版)ID 是一致的。

BackgroundJob.scheduleRecurrently(Cron.weekly(DayOfWeek.MONDAY),
    () -> publicRelationService.sendWeeklyNewsletter());
BackgroundJob.scheduleRecurrently(Cron.weekly(DayOfWeek.TUESDAY),
    () -> publicRelationService.sendWeeklyNewsletter());
BackgroundJob.scheduleRecurrently(Cron.weekly(DayOfWeek.FRIDAY),
    () -> publicRelationService.sendWeeklyNewsletter());

执行这些语句会反复修改 ID 为 cc.ddrpa.playground.jobrunr_playground.PublicRelationService.sendWeeklyNewsletter() 的定时任务计划,直到其最后一次被修改为每周五 00:00 执行。

假如你真的想要在周一、周二和周五的凌晨执行这些任务的话,除了老老实实写组合后的 Cron 表达式,还可以通过含有 id 参数的 scheduleRecurrently 方法为三个周期任务赋予不同的 ID。

如果更习惯 Spring Task Scheduler 风格的代码,也可以使用 org.jobrunr.jobs.annotations.Recurring 注解修饰 Bean 方法。

@Recurring(id = "send-weekly-news-letter-recurring-job", cron = "0 0 * * 5")
public void sendWeeklyNewsletter() {
    userRepository
        .findAll()
        .forEach(user -> logger.info(
            "Greetings {}<{}>, here is your weekly newsletter",
            user.getNickName(), user.getEmail()));
}

取消与终止任务

可以使用 delete(JobId) 方法取消任务,周期性任务则使用 deleteRecurringJob(String)

如果一个任务还在等待队列中,其状态会被标记为「已删除」,并在一段时间后清除。

对已经在运行的任务,JobRunr 会尝试中断(interrupt)正在执行该任务的线程。如果你的代码中有一些可以被中断的方法(例如 IO 操作),保证 InterruptedException 能在方法层面被抛出即可。如果没有,那这个方法就是「不可中断的」,代码将一直执行到任务结束为止。你可以在一些关键节点添加如下代码,插入中断点:

if (Thread.currentThread().isInterrupted())
    throw new InterruptedException();

查询任务状态

如果使用 Spring Boot Starter,则可以获取到 org.jobrunr.server.BackgroundJobServer 类型的 Bean,支持使用 JPA 风格的 API 查询任务列表:

Page<Job> failedJobs = backgroundJobServer
    .getStorageProvider()
    .getJobs(StateName.FAILED, pageRequest)

异常处理和自动重试

如果规划的任务产生了异常,抛出异常就可以让 JobRunr 知道任务没有成功。

Jobrunr 默认会重试 10 次,这个行为可以通过 org.jobrunr.jobs.default-number-of-retries 改变,重试的时间则由 org.jobrunr.jobs.retry-back-off-time-seed 确定。你也可以查询失败的任务,决定手动执行还是删除它们。

其他没有提到的内容

  • JobRunr 有其他任务编排方式,可以更灵活地设置任务,例如与全局配置不同的失败重试次数等
  • JobRunr 支持 Virtual Threads(JDK 21)
  • JobRunr 可以是分布式的

JobRunr architecture