ES
参考分布式定时任务基础使用:ES-JOB——分布式定时任务基础使用_择业的博客-CSDN博客
再上面链接代码后进行代码编译
分布式定时任务:按照分片执行,同一个jar在不同服务器上发布,同一定时任务,按照定时任务的分片执行。
高级用法思想:
一个job用后台创建job方式,后台创建设置管理定时任务(可以修改,不能新增)
用@job自定义注解方式,后台设置创建管理定时任务
1.将数据源与Job关联起来JobEventConfig
package com.bfxy.esjob.config;import javax.sql.DataSource;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;@Configuration
public class JobEventConfig {@Autowiredprivate DataSource dataSource;@Beanpublic JobEventConfiguration jobEventConfiguration() {return new JobEventRdbConfiguration(dataSource);}
}
2.将JobEventConfiguration注入到MySimpleJobConfig
package com.bfxy.esjob.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.bfxy.esjob.listener.SimpleJobListener;
import com.bfxy.esjob.task.MySimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;@Configuration
public class MySimpleJobConfig {@Autowiredprivate ZookeeperRegistryCenter registryCenter;@Autowiredprivate JobEventConfiguration jobEventConfiguration;@Beanpublic SimpleJob simpleJob() {return new MySimpleJob();}/*** @param simpleJob* @return*/@Bean(initMethod = "init")public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,@Value("${simpleJob.cron}") final String cron,@Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters,@Value("${simpleJob.jobParameter}") final String jobParameter,@Value("${simpleJob.failover}") final boolean failover,@Value("${simpleJob.monitorExecution}") final boolean monitorExecution,@Value("${simpleJob.monitorPort}") final int monitorPort,@Value("${simpleJob.maxTimeDiffSeconds}") final int maxTimeDiffSeconds,@Value("${simpleJob.jobShardingStrategyClass}") final String jobShardingStrategyClass) {return new SpringJobScheduler(simpleJob,registryCenter,getLiteJobConfiguration(simpleJob.getClass(),cron,shardingTotalCount,shardingItemParameters,jobParameter,failover,monitorExecution,monitorPort,maxTimeDiffSeconds,jobShardingStrategyClass),jobEventConfiguration,new SimpleJobListener());}private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).misfire(true).failover(failover).jobParameter(jobParameter).shardingItemParameters(shardingItemParameters).build();SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).jobShardingStrategyClass(jobShardingStrategyClass).monitorExecution(monitorExecution).monitorPort(monitorPort).maxTimeDiffSeconds(maxTimeDiffSeconds).overwrite(false).build();return liteJobConfiguration;}}
3.流式Job,DataflowJobConfig/循环执行定时任务,每十秒钟执行一次
/** Copyright 1999-2015 dangdang.com.* <p>* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** .0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.* </p>*/package com.bfxy.esjob.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.bfxy.esjob.task.SpringDataflowJob;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;@Configuration
public class DataflowJobConfig {@Autowiredprivate ZookeeperRegistryCenter regCenter;@Autowiredprivate JobEventConfiguration jobEventConfiguration;@Beanpublic DataflowJob dataflowJob() {return new SpringDataflowJob();}@Bean(initMethod = "init")public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron,@Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,@Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {SpringJobScheduler springJobScheduler = new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron,shardingTotalCount, shardingItemParameters), jobEventConfiguration);
// springJobScheduler.init();return springJobScheduler;}private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(),true)) //streamingProcess.overwrite(true).build();}
}
相关实体类
package com.bfxy.esjob.entity;public class Foo {private String id;private String name;public Foo() {}public Foo(String id, String name) {super();this.id = id;this.name = name;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}}
具体job任务SpringDataflowJob
/** Copyright 1999-2015 dangdang.com.* <p>* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** .0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.* </p>*/package com.bfxy.esjob.task;import java.util.List;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.bfxy.esjob.entity.Foo;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;public class SpringDataflowJob implements DataflowJob<Foo> {private static final Logger LOGGER = LoggerFactory.getLogger(SpringDataflowJob.class);@Overridepublic List<Foo> fetchData(final ShardingContext shardingContext) {System.err.println("--------------@@@@@@@@@@ 抓取数据集合...--------------");return null;}@Overridepublic void processData(final ShardingContext shardingContext, final List<Foo> data) {System.err.println("--------------@@@@@@@@@ 处理数据集合...--------------");}
}