Quartz
最近有需求需要研究下任务调度框架,看了才发现原来之前有用过,只不过就搭了下 demo 。Quartz 是一个任务调度框架。可以用来运行定时任务,例如定时发邮件,定时爬数据(采集交易所记录进行分析,提醒你快爆仓啦),定时自定义任务(运行程序)等。给予一个触发条件,到点即触发相应的 job。类似 linux 下的 crontab 定时任务
cron 表达式
cron 表达式是 Linux 系统用来设计计划任务的,比如:每天晚上 12 点重启指定应用,到定时运行指定的脚本。
cron 格式:
cron表现为一个字符串,字符串包含 6-7 个字段,字段间空格相隔,这个字段允许填入的任何值或特殊字符。
| 字段 | 允许值 | 允许的特殊字符 |
|---|---|---|
| 秒 | 0-59 | , - * / |
| 分 | 0-59 | , - * / |
| 小时 | 0-23 | , - * / |
| 日期 | 1-31 | , - * / L W C |
| 月份 | 1-12 or JAN-DEC | , - * / |
| 星期 | 1-7 or SUN-SAT | , - * / L C # |
| 年 | 留空,1970-2099 | , - * / |
特殊字符所表示的含义:
* 指定所有值。如:在分钟字段表示 “每分钟”
- 指定一个范围。如 10-12 在小时字段表示 “10点、11点、12点”
, 指定另外的值。如:MON,WED,FRI 在星期字段表示 “周一、周三、周五”
? 只在日期和星期字段中使用。指定 “非明确的值”。需要通过在这两个字段中指定一些东西的时候。
L 指定在用或者星期中的某天(最后一天)。“Last” 的缩写,在星期中表示为 “7” 或者 “SAT”,在月中表示月份中最后一天 ”1月31日,2 月 28 日“。如果在星期字段某个 value 后面,则表示 ”某月的最后一个星期 value“,如 ”6L“ 表示某月的最后一个星期五。(注意: cron 中 1 表示周日,7 表示周六)
W 只能用在月份字段中,指定了离指定日期最近的那个星期日
# 只能用在星期字段,指定了第 n 个星期 value 在某月中
通常 cron 可以用在线工具生成。在线Cron表达式生成器
创建一个 Quartz demo
选择 Spring生成器 -> I/O -> Quartz Scheduler
可以看到 pom.xml 引用了
1 | <dependency> |
在 Application 上启用 Scheduling 。注解 @EnableScheduling 。
创建一个 job 类 PrintCurrentTimeTask.java
1 | // 注册为组件 |
这样就完成了一个简单的 demo。
定时任务的基本概念:
- 任务:即被触发的业务内容。
- 执行器:即执行任务的机器。
- 触发器:即满足定义条件(cron)就会被自动调用。cron 也可以任务是触发事件(条件)。
- Quartz 中 @Scheduled 可以任务是触发器,执行 PrintCurrentTimeTask 的机器即执行器,printCurrentTime 业务代码即任务。
实际应用中,可能需要分布式调度框架。这样的单机运行方式并不能满足需求
- 无法集群部署,可能会出现任务重复执行。
- 不支持统一生命周期管理,控制启动、关闭任务。
- 不支持分片任务, 多机器分片执行处理不同数据
- 不支持失败重试,出现异常后任务终止,不能根据状态控制任务重新执行。
- 无报警机制,任务失败后没有报警机制。
- 任务数据统计难以统计,数据量大时对任务执行情况无法高效统计执行情况。
- 不支持动态调整,不重启服务的情况下修改任务参数。
使用任务调度系统是 ETL(抽取-转换-加载:Extract-Transform-Load) 任务的重要手段,例如定时抽取业务数据库数据,定时运行 hive/spark 任务,推送某些指标类数据。调度系统分两大类:定时分片类作业调度系统和DAG工作流类作业调度系统。
分布式任务调度框架有 elasticjob 和 xxl-job 等,elasticjob 是面对高并发复杂业务而生,以来 Zookeeper 使用服务发现与注册(主节点选举,去中心化,Zookeeper 分布式锁),具有高可用、强一致性和较好的扩展性(伸缩立方,X轴 水平扩展)。xxk-job 是轻量级,开箱即用,操作简单,企业维护成本不高等等优势,有非常多企业选择 xxl-job。
XXL-JOB
安装使用步骤:
- 到官网拉取 master 分支。
- 将 xxl-job -> doc -> *.sql 导入到数据库中
- 修改 xxl-job-admin -> resources -> application.propertis :spring.datasource 的 username 和 password
- 启动 xxl-job-admin
打开任务调度中心: http://localhost:8080/xxl-job-admin/toLogin (admin/123456)
新建 一个 bean demo
在 xxl-job-executor-sample-springboot 下的 com.xxl.job.executor.service.jobhandle 新建一个 DemoXxJob 类
1
2
3
4
5
6
7
8
9
10
11
12
public class DemoXxJob {
// 声明一个 jobhandler
(value = "demoJobHandle")
public ReturnT<String> execute(String param) throws Exception{
// 简单输出日志
XxlJobHelper.log("XXL-JOB, HELLO WORLD.");
System.out.println("XXL-JOB, HELLO WORLD.");
return ReturnT.SUCCESS;
}
}
进入任务调度中心添加一个任务管理项 CRON 为 ’ 0 1 * * * ? * ‘,允许模式为 0’ BEAN ‘,jobhandler为 ’ demoJobHandle ‘
修改 *-smple-springboot 下的 application.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19### admin 调度中心的通讯地址 多个地址则用逗号分隔 执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN:非空时启用;
=
### 执行器AppName:执行器心跳注册分组依据;为空则关闭自动注册
=xxl-job-executor-sample
### 执行器注册:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
=
### 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
### 多网卡模式下,使用指定的网络 IP 运行。
=
### RPC 模块的端口,执行器端口号:小于等于0则自动获取;
=9999
### 日志目录
=/data/applogs/xxl-job/jobhandler
### 定时清理日志
=30
启动 spring application 程序
在调度中心启动任务(启动一次)
在项目下控制台 输出 XXL-JOB, HELLO WORLD.
分片 demo 方法代码, 运行两个 JobExecutorApplication。
1 | // 分片 demo |
官方架构图 PPT:
- v1.5 与 v1.7 版本变化较大
解读源码的疑问:
- 调度线程池 (JobTriggerPoolHelper)下实现了 fast 和 slow 线程池,作用是让其执行速度较慢的任务分配到 slow 线程池,避免耗尽调度线程,提高系统稳定性。
- 路由的策略模式:一致性hash(hash 环,hash 碰撞)、故障转移、忙碌转移
- 自研 RPC 。服务的发现与注册(执行器自动注册实现),心跳。(ExecutorBiz、AdminBiz)客戶端利用 AdminBiz.registry(registryParam) 定期向 admin 发送注册信息(ExecutorRegistryThread,EmbedServer )。admin 接收信息后,维护 xxl_job_registry 表的 update_time 字段。admin 在 JobRegistryMonitorHelper 中启动线程扫描 xxl_job_registry 表 ,移除超时元组,并将在线实例集合拼接在一起更新到执行器地址为自动注册的执行器 address_list 字段中。
- ThreadLocal(ThreadLocalMap 内存泄漏) 线程变量,实现任务隔离。InheritableThreadLocal (子任务(子线程)数据共享?)
- XxlJobSpringExecutor 遍历容器 bean 通过解析 XxlJob 注解 拿到 XxlJob 类,封装成 jobHandler 存储在 jobHandlerRepository。由 XxlJobExecutor 处理器的 registJobThread 方法启动一条线程运行任务 。
- NIO,Reactor 模式
- Glue 模式
- GlueFactory
- 通过 AnnotationUtils 解析注解信息,从 ApplicationContext 拿到 bean
- 任务完成信息回调。TriggerCallbackThread
- 调度中心启动过程 XxlJobScheduler
- i18n
- 注册监听器
- 启动失败日志监听器
- 初始化 RPC
- 启动定时任务调度器(执行任务缓存任务)
- 执行器启动过程 XxlJobSpringExecutor(调用 XxlJobExecutor.start)
- 注册 JobHandler 到缓存 (方法)
- 刷新 glue
- 核心启动(super.start\ XxlJobExecutor.start):
- 初始化日志路径
- 初始化注册中心列表
- 启动文件清理线程 (每日清理,配置参数 大于3 才生效)
- 启动触发器回调线程
- 初始化 RPC
- 调度策略执行流程 scheduleThread,ringThread
参考:
xxl-job:
https://www.xuxueli.com/xxl-job/
https://github.com/xuxueli/xxl-job/tree/master/doc
https://blog.csdn.net/god_86/article/details/114610421
https://www.cnblogs.com/guoyinli/p/11555035.html
调度系统:
- 本文作者: MISAKIGA
- 本文链接: https://misakiga.github.io/2021/06/21/microservice/Distributed_Scheduler/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!
