Elastic
多情只有春庭月,犹为离人照落花。
概述因项目中使用到定时任务,且服务部署多实例,因此需要解决定时任务重复执行的问题。即在同一时间点,每一个定时任务只在一个节点上执行。常见的开源方案,如 Elastic-job 、 xxl-job 、quartz 、 saturn、 opencron 、 antares 等。最终决定使用elastic-job。elastic-job的亮点主要如下:
基于quartz 定时任务框架为基础的,因此具备quartz的大部分功能使用zookeeper做协调,调度中心,更加轻量级支持任务的分片支持弹性扩容 , 可以水平扩展 , 当任务再次运行时,会检查当前的服务器数量,重新分片,分片结束之后才会继续执行任务失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper断开连接之后,会立即停止作业,然后再去寻找其他空闲的调度服务器,来运行剩余的任务提供运维界面,可以管理作业和注册中心但在实际开发中发现elastic-job对于动态添加的定时任务不支持分片。即在多实例情况下,在某个实例上动态添加任务,则该任务会一直在这一台节点上运行。如果需要在其它实例上运行,则需要以相同的参数调用其它实例接口。参考:elastic-job:动态进行任务的添加。在多次百度+google下发现Elastic-Job动态添加任务这里与楼主遇到了相同的问题。但经楼主测试动态添加任务的分片时好时坏,且只要在zookeeper中注册了任务,重启时任务还是会自动初始化。(关于对动态呢任务的描述,可以参考上面链接的描述,此处不在做过多的解释)。
解决顺着尹大的思路,将任务的节点都集中管理起来,无论动态任务在哪个节点上进行注册,都需要将这个请求转发到其他的节点上进行初始化操作,这样就可以保证多节点分片的任务正常执行。
代码如下:
/*** 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务*/public void monitorJobRegister() {CuratorFramework client = zookeeperRegistryCenter.getClient();@SuppressWarnings("resource")PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true);PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {ChildData data = event.getData();switch (event.getType()) {case CHILD_ADDED:String config = new String(client.getData().forPath(data.getPath() + "/config"));Job job = JsonUtils.toBean(Job.class, config);Object bean = null;// 获取bean失败则添加任务try {bean = ctx.getBean("SpringJobScheduler" + job.getJobName());} catch (BeansException e) {logger.error("ERROR NO BEAN,CREATE BEAN SpringJobScheduler" + job.getJobName());}if (Objects.isNull(bean)) {addJob(job);}break;default:break;}}};childrenCache.getListenable().addListener(childrenCacheListener);try {// blog.csdn.net/u010402202/article/details/79581575childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);} catch (Exception e) {e.printStackTrace();}} 测试测试动态添加定时任务,支持分片失效转移。
下载elastic-job-spring-boot-starter 使用maven 命令install到本地创建 demo-elastic-job项目 目录结构如下: demo-elastic-job├── mvnw├── mvnw.cmd├── src│ ├── main│ │ ├── java│ │ │ └── │ │ │ └── example│ │ │ └── demo│ │ │ ├── job│ │ │ │ ├── DynamicJob.java│ │ │ │ └── TestJob.java│ │ │ └── DemoApplication.java│ │ └── resources│ │ ├── application.yml│ │ └── application-dev.yml│ └── test│ └── java│ └── │ └── example│ └── demo│ └── DemoApplicationTests.java├── pom.xml└── demo.iml pom.xml <?xml version="1.0" encoding="UTF-8"?><project xmlns="maven.apache./POM/4.0.0" xmlns:xsi="w3./2001/XMLSchema-instance"xsi:schemaLocation="maven.apache./POM/4.0.0 maven.apache./xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.10.0</version></dependency><dependency><groupId>.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.10.0</version></dependency><dependency><groupId>.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>.cxytiandi</groupId><artifactId>elastic-job-spring-boot-starter</artifactId><version>1.0.4</version></dependency><dependency><groupId>.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project> DemoApplication.java package .example.demo;import .cxytiandi.elasticjob.annotation.EnableElasticJob;import .cxytiandi.elasticjob.dynamic.service.JobService;import .springframework.beans.factory.annotation.Autowired;import .springframework.boot.CommandLineRunner;import .springframework.boot.SpringApplication;import .springframework.boot.autoconfigure.SpringBootApplication;import .springframework.context.annotation.ComponentScan;@SpringBootApplication@EnableElasticJob@ComponentScan(basePackages = {".cxytiandi", ".example.demo"})public class DemoApplication implements CommandLineRunner {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}@Autowiredprivate JobService jobService;@Overridepublic void run(String... args) throws Exception {// 模拟初始化读取数据库 添加任务// Job job1 = new Job();// job1.setJobName("job1");// job1.setCron("0/10 * * * * ? ");// job1.setJobType("SIMPLE");// job1.setJobClass(".example.demo.job.DynamicJob");// job1.setShardingItemParameters("");// job1.setShardingTotalCount(2);// jobService.addJob(job1);// Job job2 = new Job();// job2.setJobName("job2");// job2.setCron("0/10 * * * * ? ");// job2.setJobType("SIMPLE");// job2.setJobClass(".example.demo.job.DynamicJob");// job2.setShardingItemParameters("0=A,1=B");// job2.setShardingTotalCount(2);// jobService.addJob(job2);}} TestJob.java package .example.demo.job;import .cxytiandi.elasticjob.annotation.ElasticJobConf;import .dangdang.ddframe.job.api.ShardingContext;import .dangdang.ddframe.job.api.simple.SimpleJob;import lombok.extern.slf4j.Slf4j;import .apache.mons.lang3.StringUtils;import .springframework.stereotype.Component;/*** Created by zhenglongfei on 2019/7/22** @VERSION 1.0*/@Component@Slf4j@ElasticJobConf(name = "dayJob", cron = "0/10 * * * * ?", shardingTotalCount = 2,shardingItemParameters = "0=AAAA,1=BBBB", description = "简单任务", failover = true)public class TestJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {log.info("TestJob任务名:【{}】, 片数:【{}】, param=【{}】", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),shardingContext.getShardingParameter());}} DynamicJob.java package .example.demo.job;import .dangdang.ddframe.job.api.ShardingContext;import .dangdang.ddframe.job.api.simple.SimpleJob;import lombok.extern.slf4j.Slf4j;import .springframework.stereotype.Component;/*** Created by zhenglongfei on 2019/7/24** @VERSION 1.0*/@Component@Slf4jpublic class DynamicJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {switch (shardingContext.getShardingItem()) {case 0:log.info("【0】 is running");break;case 1:log.info("【1】 is running");break;}}} application.yml elastic:job:zk:serverLists: 172.25.66.137:2181namespace: demo_testserver:port: 8082spring:redis:host: 127.0.0.1port: 6379 测试结果启动两个项目分别为8081和8082端口,使用REST API来动态的注册任务。
joblocalhost:8081/job post 参数如下:
{"jobName": "DynamicJob01","cron": "0/3 * * * * ?","jobType": "SIMPLE","jobClass": ".example.demo.job.DynamicJob","jobParameter": "test","shardingTotalCount": 2,"shardingItemParameters": "0=AAAA,1=BBBB"} 代码下载 github:demo-elastic-jobgithub:elastic-job-spring-boot-starter???关注微信公众号java干货 不定期分享干货资料
参考链接:
Elastic-Job动态添加任务elastic-job:动态进行任务的添加Curator之PathChildrenCache的那些坑转载于:blogs./merryyou/p/11549294.html
Elastic