Understand the implementation principle of @ scheduled in spring through the source code and realize the dynamic loading of scheduling tasks
premise
Recent new projects are related to data synchronization, and there is a need for regular scheduling. Previously, I used quartz, XXL job, easy scheduler and other scheduling frameworks. Later, I felt that these frameworks were too heavyweight, so I thought of the scheduling module built in spring. The native scheduling module is only a memory scheduling module, It does not support task persistence or configuration (configuration tasks are hard coded through @ scheduled annotation and cannot be pulled out of the class). Therefore, it is considered to understand the underlying principle of scheduling module and build a simple wheel based on this to support scheduling task configuration: through configuration file or JDBC data source.
Scheduling module
The scheduling module is a package under the spring context dependency. Org springframework. scheduling:
There are not many classes in this module. There are four sub packages:
If you want to use scheduling alone, you only need to introduce the dependency spring context. However, springboot is now popular. Spring boot starter web has integrated spring context and can directly use the scheduling module. At the time of writing this article (2020-03-14), the latest version of springboot is 2.2.5.release. You can choose this version for source code analysis or production application:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
To enable the scheduling module support, you only need to add the @ enableshcheduling annotation in a configuration class. Generally, in order to clarify the introduction of the module, it is recommended to use this annotation in the startup class, such as:
@EnableScheduling
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class,args);
}
}
Workflow of scheduling module
This figure describes the workflow of the scheduling module. Here, analyze the process under non XML configuration (the branch on the right):
task scheduler
The scheduling module supports task scheduler or scheduledexecutorservice, which is actually JDK and Java util. The common implementation class of the concurrent interface is the scheduler thread pool scheduledthreadpoolexecutor. In fact, instances of scheduledexecutorservice type will eventually be transformed into concurrent taskscheduler through adapter mode, so only executors of taskscheduler type need to be analyzed here.
That is to say, the bottom layer of the three built-in scheduler types depends on the JUC scheduling thread pool scheduledthreadpoolexecutor. Here's an analysis of the top-level interface org springframework. scheduling. Functions provided by taskscheduler (the author has temporarily removed the default method with consistent functions):
// 省略一些功能一致的default方法
public interface TaskScheduler {
// 调度一个任务,通过触发器实例指定触发时间周期
ScheduledFuture<?> schedule(Runnable task,Trigger trigger);
// 指定起始时间调度一个任务 - 单次执行
ScheduledFuture<?> schedule(Runnable task,Date startTime);
// 指定固定频率调度一个任务,period的单位是毫秒
ScheduledFuture<?> scheduleAtFixedRate(Runnable task,long period);
// 指定起始时间和固定频率调度一个任务,period的单位是毫秒
ScheduledFuture<?> scheduleAtFixedRate(Runnable task,Date startTime,long period);
// 指定固定延迟间隔调度一个任务,delay的单位是毫秒
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task,long delay);
// 指定起始时间和固定延迟间隔调度一个任务,delay的单位是毫秒
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task,long delay);
}
Classification of tasks
Different types of tasks are supported in the scheduling module, mainly including the following three types (the resolution priority is also as follows):
For these types of tasks, give some simple examples. Crontask specifies the execution cycle through a cron expression and does not support delayed execution. You can use special characters - Disable task execution:
// 注解声明式使用 - 每五秒执行一次,不支持initialDelay
@Scheduled(cron = "*/5 * * * * ?")
public void processTask(){
}
// 注解声明式使用 - 禁止任务执行
@Scheduled(cron = "-")
public void processTask(){
}
// 编程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
CronTask cronTask = new CronTask(() -> {
System.out.println(String.format("[%s] - CronTask触发...",F.format(LocalDateTime.Now())));
},"*/5 * * * * ?");
taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次执行输出结果
[2020-03-16 01:07:00] - CronTask触发...
[2020-03-16 01:07:05] - CronTask触发...
......
The fixeddelaytask needs to configure the delay interval value (fixeddelay or fixeddelaystring) and the optional start delay execution time (initialdelay or initialdelaystring). Note that both fixeddelaystring and initialdelaystring support reading and duration from the embedded valueresolver (simply understood as the attribute processor of the configuration file) (for example, P2D is parses as 2 days, representing 86400 seconds) format resolution is supported:
// 注解声明式使用 - 延迟一秒开始执行,延迟间隔为5秒
@Scheduled(fixedDelay = 5000,initialDelay = 1000)
public void process(){
}
// 注解声明式使用 - spring-boot配置文件中process.task.fixedDelay=5000 process.task.initialDelay=1000
@Scheduled(fixedDelayString = "${process.task.fixedDelay}",initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 编程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> {
System.out.println(String.format("[%s] - FixedDelayTask触发...",5000,1000);
Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay());
taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(),startTime,fixedDelayTask.getInterval());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次执行输出结果
[2020-03-16 01:06:12] - FixedDelayTask触发...
[2020-03-16 01:06:17] - FixedDelayTask触发...
......
Fixedratetask needs to configure fixed interval value (fixedrate or fixedratestring) and optional start delay execution time (initialdelay or initialdelaystring). Note that both fixedratestring and initialdelaystring support reading and duration from embedded valueresolver (simply understood as the attribute processor of the configuration file) (for example, P2D is parses as 2 days, representing 86400 seconds) format resolution is supported:
// 注解声明式使用 - 延迟一秒开始执行,每隔5秒执行一次
@Scheduled(fixedRate = 5000,initialDelay = 1000)
public void processTask(){
}
// 注解声明式使用 - spring-boot配置文件中process.task.fixedRate=5000 process.task.initialDelay=1000
@Scheduled(fixedRateString = "${process.task.fixedRate}",initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 编程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
FixedRateTask fixedRateTask = new FixedRateTask(() -> {
System.out.println(String.format("[%s] - FixedRateTask触发...",1000);
Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay());
taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(),fixedRateTask.getInterval());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次执行输出结果
[2020-03-16 23:58:25] - FixedRateTask触发...
[2020-03-16 23:58:30] - FixedRateTask触发...
......
Simply analyze the source code of the core process
Under the spring boot annotation system, all the logic of the scheduling module is basically in the scheduledannotationbeanpostprocessor and scheduledtask registrar. Generally speaking, the interface implemented by a class represents the functions it can provide. First look at the interface implemented by scheduledannotationbeanpostprocessor:
The hook interfaces analyzed above can be used as needed in the springboot system. Understand the callback timing of different hook interfaces, and can be completed at a specific time to achieve the desired effect.
@The resolution of the scheduled annotation is focused on the postprocessafterinitialization() method:
public Object postProcessAfterInitialization(Object bean,String beanName) {
// 忽略AopInfrastructureBean、TaskScheduler和scheduledexecutorservice三种类型的Bean
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof scheduledexecutorservice) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 获取Bean的用户态类型,例如Bean有可能被cglib增强,这个时候要取其父类
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
// nonAnnotatedClasses存放着不存在@Scheduled注解的类型,缓存起来避免重复判断它是否携带@Scheduled注解的方法
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass,Arrays.asList(Scheduled.class,Schedules.class))) {
// 因为JDK8之后支持重复注解,因此获取具体类型中Method -> @Scheduled的集合,也就是有可能一个方法使用多个@Scheduled注解,最终会封装为多个Task
Map<Method,Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method,Scheduled.class,Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
// 解析到类型中不存在@Scheduled注解的方法添加到nonAnnotatedClasses缓存
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Method -> @Scheduled的集合遍历processScheduled()方法进行登记
annotatedMethods.forEach((method,scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled,method,bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
Processscheduled (scheduled, scheduled, method, object bean) is a specific annotation parsing and task encapsulation method:
// Runnable适配器 - 用于反射调用具体的方法,触发任务方法执行
public class ScheduledMethodRunnable implements Runnable {
private final Object target;
private final Method method;
public ScheduledMethodRunnable(Object target,Method method) {
this.target = target;
this.method = method;
}
....// 省略无关代码
// 这个就是最终的任务方法执行的核心方法,抑制修饰符,然后反射调用
@Override
public void run() {
try {
ReflectionUtils.makeAccessible(this.method);
this.method.invoke(this.target);
}
catch (InvocationTargetException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
}
catch (illegalaccessexception ex) {
throw new UndeclaredThrowableException(ex);
}
}
}
// 通过方法所在Bean实例和方法封装Runnable适配器ScheduledMethodRunnable实例
protected Runnable createRunnable(Object target,Method method) {
Assert.isTrue(method.getParameterCount() == 0,"Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method,target.getClass());
return new ScheduledMethodRunnable(target,invocableMethod);
}
// 这个方法十分长,不过逻辑并不复杂,它只做了四件事
// 0. 解析@Scheduled中的initialDelay、initialDelayString属性,适用于FixedDelayTask或者FixedRateTask的延迟执行
// 1. 优先解析@Scheduled中的cron属性,封装为CronTask,通过ScheduledTaskRegistrar进行缓存
// 2. 解析@Scheduled中的fixedDelay、fixedDelayString属性,封装为FixedDelayTask,通过ScheduledTaskRegistrar进行缓存
// 3. 解析@Scheduled中的fixedRate、fixedRateString属性,封装为FixedRateTask,通过ScheduledTaskRegistrar进行缓存
protected void processScheduled(Scheduled scheduled,Object bean) {
try {
// 通过方法宿主Bean和目标方法封装Runnable适配器ScheduledMethodRunnable实例
Runnable runnable = createRunnable(bean,method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron','fixedDelay(String)',or 'fixedRate(String)' attributes is required";
// 缓存已经装载的任务
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
// 解析初始化延迟执行时间,initialDelayString支持占位符配置,如果initialDelayString配置了,会覆盖initialDelay的值
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0,"Specify 'initialDelay' or 'initialDelayString',not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
// 解析时区zone的值,支持支持占位符配置,判断cron是否存在,存在则装载为CronTask
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1,"'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
// 此方法虽然表面上是调度CronTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
// 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable,new crontrigger(cron,timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
// 修正小于0的初始化延迟执行时间值为0
if (initialDelay < 0) {
initialDelay = 0;
}
// 解析fixedDelay和fixedDelayString,如果同时配置,fixedDelayString最终解析出来的整数值会覆盖fixedDelay,封装为FixedDelayTask
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule,errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable,fixedDelay,initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule,errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
// 此方法虽然表面上是调度FixedDelayTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
// 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable,initialDelay)));
}
}
// 解析fixedRate和fixedRateString,如果同时配置,fixedRateString最终解析出来的整数值会覆盖fixedRate,封装为FixedRateTask
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule,errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable,fixedRate,initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule,errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
// 此方法虽然表面上是调度FixedRateTask,实际上由于ScheduledTaskRegistrar不持有TaskScheduler,只是把任务添加到它的缓存中
// 返回的任务实例添加到宿主Bean的缓存中,然后最后会放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable,initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule,errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
// 注册所有任务实例,这个映射Key为宿主Bean实例,Value为List<ScheduledTask>,后面用于调度所有注册完成的任务
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean,key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
In general, this method does four things:
@If cron, fixeddelay|fixeddelaystring and fixedrate|fixedratestring properties are configured for a method modified by scheduled, it means that this method is encapsulated into three tasks crontask, fixeddelaytask and fixedratetask at the same time. The embedded valueresolver is used to parse the value of the xxstring. It supports placeholders. In this way, you can directly obtain the placeholder properties in the environment configuration (based on the spel feature, you can even support nested placeholders). All task instances that are successfully parsed are stored in a mapping scheduledtasks of scheduledannotationbeanpostprocessor:
// 宿主Bean实例 -> 解析完成的任务实例Set
private final Map<Object,Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
After the parsing and caching work is completed, analyze the logic of finally activating all scheduling tasks. See the mutually exclusive methods aftersingletonsinstantiated() and onapplicationevent(). Only one of them can call finishregistration():
// 所有单例实例化完毕之后回调
public void afterSingletonsInstantiated() {
// Remove resolved singleton classes from cache
this.nonAnnotatedClasses.clear();
if (this.applicationContext == null) {
// Not running in an ApplicationContext -> register tasks early...
finishRegistration();
}
}
// 上下文刷新完成之后回调
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}
//
private void finishRegistration() {
// 如果持有的scheduler对象不为null则设置ScheduledTaskRegistrar中的任务调度器
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
// 这个判断一般会成立,得到的beanfactory就是DefaultListablebeanfactory
if (this.beanfactory instanceof Listablebeanfactory) {
// 获取所有的调度配置器SchedulingConfigurer实例,并且都回调configureTasks()方法,这个很重要,它是用户动态装载调取任务的扩展钩子接口
Map<String,SchedulingConfigurer> beans = ((Listablebeanfactory) this.beanfactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
// SchedulingConfigurer实例列表排序
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// 下面这一大段逻辑都是为了从beanfactory取出任务调度器实例,主要判断TaskScheduler或者scheduledexecutorservice类型的Bean,包括尝试通过类型或者名字获取
// 获取成功后设置到ScheduledTaskRegistrar中
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanfactory != null,"beanfactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanfactory,TaskScheduler.class,false));
}
catch (NoUniqueBeanDeFinitionException ex) {
logger.trace("Could not find unique TaskScheduler bean",ex);
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanfactory,true));
}
catch (NoSuchBeanDeFinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context,and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDeFinitionException ex) {
logger.trace("Could not find default TaskScheduler bean",ex);
// Search for scheduledexecutorservice bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanfactory,scheduledexecutorservice.class,false));
}
catch (NoUniqueBeanDeFinitionException ex2) {
logger.trace("Could not find unique scheduledexecutorservice bean",ex2);
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanfactory,true));
}
catch (NoSuchBeanDeFinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one scheduledexecutorservice bean exists within the context,and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDeFinitionException ex2) {
logger.trace("Could not find default scheduledexecutorservice bean",ex2);
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/scheduledexecutorservice bean found for scheduled processing");
}
}
}
// 调用ScheduledTaskRegistrar的afterPropertiesSet()方法,装载所有的调度任务
this.registrar.afterPropertiesSet();
}
public class ScheduledTaskRegistrar implements ScheduledTaskHolder,InitializingBean,DisposableBean {
// 省略其他代码.........
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
// 装载所有调度任务
@SuppressWarnings("deprecation")
protected void scheduleTasks() {
// 这里注意一点,如果找不到任务调度器实例,那么会用单个线程调度所有任务
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
// 调度所有装载完毕的自定义触发器的任务实例
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
// 调度所有装载完毕的CronTask
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
// 调度所有装载完毕的FixedRateTask
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
// 调度所有装载完毕的FixedDelayTask
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
// 省略其他代码.........
}
Pay attention to two questions:
Scheduling task dynamic loading
The scheduling module itself supports namespacehandler based scheduling tasks through XML file configuration, but the author has always thought that XML feels too "heavy" and cumbersome to use, It is intended to extend JSON file configuration and JDBC based data source configuration (i.e. persistent task, MySQL is selected here). According to the above source code analysis, the implementation of the schedulingconfigurer interface is needed to add self-defined scheduling tasks from the outside before all scheduling tasks are triggered. First, define some configuration attribute classes of scheduling tasks:
// 调度任务类型枚举
@Getter
@requiredArgsConstructor
public enum ScheduleTaskType {
CRON("CRON"),FIXED_DELAY("FIXED_DELAY"),FIXED_RATE("FIXED_RATE"),;
private final String type;
}
// 调度任务配置,enable属性为全局开关
@Data
public class ScheduleTaskProperties {
private Long version;
private Boolean enable;
private List<ScheduleTasks> tasks;
}
// 调度任务集合,笔者设计的时候采用一个宿主类中每个独立方法都是一个任务实例的模式
@Data
public class ScheduleTasks {
// 这里故意叫Klass代表Class,避免关键字冲突
private String taskHostKlass;
private Boolean enable;
private List<ScheduleTaskMethod> taskMethods;
}
// 调度任务方法 - enable为任务开关,没有配置会被ScheduleTaskProperties或者ScheduleTasks中的enable覆盖
@Data
public class ScheduleTaskMethod {
private Boolean enable;
private String taskDescription;
private String taskMethod;
// 时区,cron的计算需要用到
private String timeZone;
private String cronExpression;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
}
When designing, it is considered that multiple task execution methods can be placed in the same host class, which can facilitate the unified management of tasks of the same kind, such as:
public class TaskHostClass {
public void task1() {
}
public void task2() {
}
......
public void taskN() {
}
}
In terms of details, the units of intervalmilliseconds and initialdelaymilliseconds are designed as milliseconds. They are in string form, which is convenient to parse the attribute configuration in the configuration file based on stringvalueresolver. Add an abstract schedulingconfigurator:
@Slf4j
public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer,beanfactoryAware,EmbeddedValueResolverAware {
@Getter
private StringValueResolver embeddedValueResolver;
private Configurablebeanfactory configurablebeanfactory;
private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList();
private final Set<String> tasksLoaded = Sets.newHashSet();
@Override
public void setbeanfactory(beanfactory beanfactory) throws BeansException {
configurablebeanfactory = (Configurablebeanfactory) beanfactory;
}
@Override
public void afterPropertiesSet() throws Exception {
internalTasks.clear();
internalTasks.addAll(loadTaskProperties());
}
@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
embeddedValueResolver = resolver;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
for (InternalTaskProperties task : internalTasks) {
try {
synchronized (tasksLoaded) {
String key = task.taskHostKlass() + "#" + task.taskMethod();
// 避免重复加载
if (!tasksLoaded.contains(key)) {
if (task instanceof CronTaskProperties) {
loadCronTask((CronTaskProperties) task,taskRegistrar);
}
if (task instanceof FixedDelayTaskProperties) {
loadFixedDelayTask((FixedDelayTaskProperties) task,taskRegistrar);
}
if (task instanceof FixedRateTaskProperties) {
loadFixedRateTask((FixedRateTaskProperties) task,taskRegistrar);
}
tasksLoaded.add(key);
} else {
log.info("调度任务已经装载,任务宿主类:{},任务执行方法:{}",task.taskHostKlass(),task.taskMethod());
}
}
} catch (Exception e) {
throw new IllegalStateException(String.format("加载调度任务异常,任务宿主类:%s,任务执行方法:%s",task.taskMethod()),e);
}
}
}
private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass,String taskMethod) throws Exception {
Class<?> klass = ClassUtils.forName(taskHostKlass,null);
Object target = configurablebeanfactory.getBean(klass);
Method method = ReflectionUtils.findMethod(klass,taskMethod);
if (null == method) {
throw new IllegalArgumentException(String.format("找不到目标方法,taskHostKlass,taskMethod));
}
Method invocableMethod = AopUtils.selectInvocableMethod(method,target.getClass());
return new ScheduledMethodRunnable(target,invocableMethod);
}
private void loadCronTask(CronTaskProperties pops,ScheduledTaskRegistrar taskRegistrar) throws Exception {
ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(),pops.taskMethod());
String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression());
if (null != cronExpression) {
String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone());
TimeZone timeZone;
if (null != timeZoneString) {
timeZone = TimeZone.getTimeZone(timeZoneString);
} else {
timeZone = TimeZone.getDefault();
}
CronTask cronTask = new CronTask(runnable,new crontrigger(cronExpression,timeZone));
taskRegistrar.addCronTask(cronTask);
log.info("装载CronTask[{}#{}()]成功,cron表达式:{},任务描述:{}",cronExpression,pops.taskMethod(),pops.cronExpression(),pops.taskDescription());
}
}
private void loadFixedDelayTask(FixedDelayTaskProperties pops,pops.taskMethod());
long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable,fixedDelayMilliseconds,initialDelayMilliseconds);
taskRegistrar.addFixedDelayTask(fixedDelayTask);
log.info("装载FixedDelayTask[{}#{}()]成功,固定延迟间隔:{} ms,初始延迟执行时间:{} ms,pops.taskHostKlass(),initialDelayMilliseconds,pops.taskDescription());
}
private void loadFixedRateTask(FixedRateTaskProperties pops,pops.taskMethod());
long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
FixedRateTask fixedRateTask = new FixedRateTask(runnable,fixedRateMilliseconds,initialDelayMilliseconds);
taskRegistrar.addFixedRateTask(fixedRateTask);
log.info("装载FixedRateTask[{}#{}()]成功,固定执行频率:{} ms,pops.taskDescription());
}
private long parseDelayAsLong(String value) {
if (null == value) {
return 0L;
}
if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) {
return Duration.parse(value).toMillis();
}
return Long.parseLong(value);
}
private boolean isP(char ch) {
return (ch == 'P' || ch == 'p');
}
/**
* 加载任务配置,预留给子类实现
*/
protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception;
interface InternalTaskProperties {
String taskHostKlass();
String taskMethod();
String taskDescription();
}
@Builder
protected static class CronTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String cronExpression;
private String taskDescription;
private String timeZone;
@Override
public String taskDescription() {
return taskDescription;
}
public String cronExpression() {
return cronExpression;
}
public String timeZone() {
return timeZone;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
@Builder
protected static class FixedDelayTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
private String taskDescription;
@Override
public String taskDescription() {
return taskDescription;
}
public String initialDelayMilliseconds() {
return initialDelayMilliseconds;
}
public String intervalMilliseconds() {
return intervalMilliseconds;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
@Builder
protected static class FixedRateTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
private String taskDescription;
@Override
public String taskDescription() {
return taskDescription;
}
public String initialDelayMilliseconds() {
return initialDelayMilliseconds;
}
public String intervalMilliseconds() {
return intervalMilliseconds;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
}
The loadtaskproperties () method is used to load the task configuration and leave it to the subclass implementation.
JSON configuration
The format of JSON configuration file is as follows (scheduling / tasks. JSON file under the classpath):
{
"version": 1,"tasks": [
{
"taskKlass": "club.throwable.schedule.Tasks","taskMethods": [
{
"taskType": "FIXED_DELAY","taskDescription": "processTask1任务","taskMethod": "processTask1","intervalMilliseconds": "5000"
}
]
}
]
}
Each level has an enable attribute, which is true by default. The corresponding task scheduling method will not be loaded only when it is forcibly specified as false. Here is simply inheriting abstractschedulingconfigurator, implementing the logic of loading configuration from classpath, and defining jsonschedulingconfigurator:
public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer {
// 这里把默认的任务配置JSON文件放在CLASSPATH下的scheduling/tasks.json,可以通过配置项scheduling.json.config.location进行覆盖
@Value("${scheduling.json.config.location:scheduling/tasks.json}")
private String location;
@Autowired
private ObjectMapper objectMapper;
@Override
protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
ClassPathResource resource = new ClassPathResource(location);
String content = StreamUtils.copyToString(resource.getInputStream(),StandardCharsets.UTF_8);
ScheduleTaskProperties properties = objectMapper.readValue(content,ScheduleTaskProperties.class);
if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) {
return Lists.newArrayList();
}
List<InternalTaskProperties> target = Lists.newArrayList();
for (ScheduleTasks tasks : properties.getTasks()) {
if (null != tasks) {
List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods();
if (null != taskMethods) {
for (ScheduleTaskMethod taskMethod : taskMethods) {
if (!Boolean.FALSE.equals(taskMethod.getEnable())) {
if (ScheduleTaskType.CRON == taskMethod.getTaskType()) {
target.add(CronTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.cronExpression(taskMethod.getCronExpression())
.timeZone(taskMethod.getTimeZone())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) {
target.add(FixedDelayTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.intervalMilliseconds(taskMethod.getIntervalMilliseconds())
.initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) {
target.add(FixedRateTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.intervalMilliseconds(taskMethod.getIntervalMilliseconds())
.initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
}
}
}
}
}
return target;
}
}
Add a configuration class and task class:
@Configuration
public class SchedulingAutoConfiguration {
@Bean
public JsonSchedulingConfigurer jsonSchedulingConfigurer(){
return new JsonSchedulingConfigurer();
}
}
// club.throwable.schedule.Tasks
@Slf4j
@Component
public class Tasks {
public void processTask1() {
log.info("processTask1触发..........");
}
}
Start the springboot application. Some logs of an execution are as follows:
2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 装载FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延迟间隔:5000 ms,初始延迟执行时间:0 ms,任务描述:processTask1任务
2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
......
Some details here are not perfect, such as some non empty judgment of configuration file parameters, whether the configuration value is legal, etc. the verification logic is not done. If you want to design an industrial class library, these aspects must be considered.
JDBC data source configuration
The JDBC data source is illustrated here with MySQL. First, create a scheduling task configuration table schedule_ task:
CREATE TABLE `schedule_task`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主键',edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',editor VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '修改者',creator VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '创建者',deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '软删除标识',task_host_class VARCHAR(256) NOT NULL COMMENT '任务宿主类全类名',task_method VARCHAR(128) NOT NULL COMMENT '任务执行方法名',task_type VARCHAR(16) NOT NULL COMMENT '任务类型',task_description VARCHAR(64) NOT NULL COMMENT '任务描述',cron_expression VARCHAR(128) COMMENT 'cron表达式',time_zone VARCHAR(32) COMMENT '时区',interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行间隔时间',initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延迟执行时间',UNIQUE uniq_class_method (task_host_class,task_method)
) COMMENT '调度任务配置表';
In fact, the specific method is similar to JSON configuration. First introduce spring boot starter JDBC, and then write mysqlschedulingconfigurator:
// DAO
@requiredArgsConstructor
public class MysqLScheduleTaskDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> {
List<ScheduleTask> tasks = Lists.newArrayList();
while (r.next()) {
ScheduleTask task = new ScheduleTask();
tasks.add(task);
task.setId(r.getLong("id"));
task.setCronExpression(r.getString("cron_expression"));
task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds"));
task.setIntervalMilliseconds(r.getLong("interval_milliseconds"));
task.setTimeZone(r.getString("time_zone"));
task.setTaskDescription(r.getString("task_description"));
task.setTaskHostClass(r.getString("task_host_class"));
task.setTaskMethod(r.getString("task_method"));
task.setTaskType(r.getString("task_type"));
}
return tasks;
};
public List<ScheduleTask> selectAllTasks() {
return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0",MULTI);
}
}
// MysqLSchedulingConfigurer
@requiredArgsConstructor
public class MysqLSchedulingConfigurer extends AbstractSchedulingConfigurer {
private final MysqLScheduleTaskDao MysqLScheduleTaskDao;
@Override
protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
List<InternalTaskProperties> target = Lists.newArrayList();
List<ScheduleTask> tasks = MysqLScheduleTaskDao.selectAllTasks();
if (!tasks.isEmpty()) {
for (ScheduleTask task : tasks) {
ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType());
if (ScheduleTaskType.CRON == scheduleTaskType) {
target.add(CronTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.cronExpression(task.getCronExpression())
.timeZone(task.getTimeZone())
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) {
target.add(FixedDelayTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
.initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) {
target.add(FixedRateTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
.initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
}
}
return target;
}
}
Remember to introduce spring boot starter JDBC and MySQL connector Java and activate the MySQL scheduling configurator configuration. Insert a record:
INSERT INTO `schedule_task`(`id`,`edit_time`,`create_time`,`editor`,`creator`,`deleted`,`task_host_class`,`task_method`,`task_type`,`task_description`,`cron_expression`,`time_zone`,`interval_milliseconds`,`initial_delay_milliseconds`) VALUES (1,'2020-03-30 23:46:10','admin','club.throwable.schedule.Tasks','processTask1','FIXED_DELAY','测试任务',NULL,10000,5000);
Then start the service, and the output of a certain execution:
2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........
....
Mixed configuration
Sometimes we want to mix JSON configuration and JDBC data source configuration, Or dynamically choose one from the other to flexibly deal with multi environment scenarios (for example, if you want to use JSON configuration in the development environment and JDBC data source configuration in the test and production environment, you can even overwrite the JDBC data source configuration with JSON configuration, which can ensure that you always prefer to use JDBC data source configuration). In this way, you need to add a layer of abstraction to the implementation of the previous two sections. For the design here, please refer to the design of controller parameter parser in spring MVC, Specifically, the handlermethodargumentresolvercomposite has the same principle.
Other precautions
In production practice, the generation of task execution logs and fine-grained monitoring are not considered for the time being, and two things are emphasized:
Solving concurrent execution problems
In general, we need to prohibit concurrent task execution and consider introducing distributed locks provided by redisson:
// 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>最新版本</version>
</dependency>
// 配置类
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedissonAutoConfiguration {
@Autowired
private RedisProperties redisProperties;
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress(String.format("redis://%s:%d",redisProperties.getHost(),redisProperties.getPort()));
if (redisProperties.getDatabase() > 0) {
singleServerConfig.setDatabase(redisProperties.getDatabase());
}
if (null != redisProperties.getpassword()) {
singleServerConfig.setPassword(redisProperties.getpassword());
}
return Redisson.create(config);
}
}
// 分布式锁工厂
@Component
public class DistributedLockFactory {
private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:";
@Autowired
private RedissonClient redissonClient;
public DistributedLock provideDistributedLock(String lockKey) {
String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey;
return new RedissonDistributedLock(redissonClient,lockPath);
}
}
Considering that the project relies on spring boot starter redis, it directly reuses its configuration attribute class (redissondistributedlock is a lightweight encapsulation of RLOCK, see Appendix). The usage is as follows:
@Autowired
private DistributedLockFactory distributedLockFactory;
public void task1() {
DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey);
// 等待时间为20秒,持有锁的最大时间为60秒
boolean tryLock = lock.tryLock(20L,60,TimeUnit.SECONDS);
if (tryLock) {
try {
// 业务逻辑
}finally {
lock.unlock();
}
}
}
Introducing trace of MDC trace task
MDC is actually the abbreviation of mapped diagnostic context, that is, mapped diagnostic context, It is generally used to track the execution process of the same thread in the log framework (for example, if a thread runs through multiple methods and logs are printed in each method, the whole call chain can be associated through a unique ID through MDC). For example, org.slf4j.mdc provided by slf4j is selected here:
@Component
public class MappedDiagnosticContextAssistant {
/**
* 在MDC中执行
*
* @param runnable runnable
*/
public void processInMappedDiagnosticContext(Runnable runnable) {
String uuid = UUID.randomUUID().toString();
MDC.put("TRACE_ID",uuid);
try {
runnable.run();
} finally {
MDC.remove("TRACE_ID");
}
}
}
When the task is executed, it needs to be wrapped into a runnale instance:
public void task1() {
mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
StopWatch watch = new StopWatch();
watch.start();
log.info("开始执行......");
// 业务逻辑
watch.stop();
log.info("执行完毕,耗时:{} ms......",watch.getTotalTimeMillis());
});
}
Combined with the concurrency control mentioned in the previous section, the final task execution method is as follows:
public void task1() {
mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
StopWatch watch = new StopWatch();
watch.start();
log.info("开始执行......");
scheduleTaskAssistant.executeInDistributedLock("任务分布式锁KEY",() -> {
// 真实的业务逻辑
});
watch.stop();
log.info("执行完毕,watch.getTotalTimeMillis());
});
}
The method here looks awkward. In fact, it can be encapsulated directly based on distributed locks and MDC during task loading. The method is similar to scheduled methodrunnable. It is not expanded here because it may take a long time to expand in detail (see the appendix for the schedule task assistant).
Summary
In fact, the whole scheduling module of spring context completely depends on the implementation of taskscheduler, and the lower layer is the scheduling thread pool scheduledthreadpoolexecutor of JUC. If you want to understand the operation principle of the whole scheduling module from the underlying principle, you must analyze the implementation of scheduledthreadpoolexecutor. The whole article roughly introduces the process of loading scheduling tasks in the spring context scheduling module, and extends a variety of ways to customize the scheduling tasks based on the extended interface scheduling configurer. However, considering that it needs to run in the production environment, it is necessary to consider the functions of monitoring, concurrency control, log tracking, etc, However, this will make the whole scheduling module heavier. Slowly, it will be found that the larger the wheel is, the more it has the shadow of the mainstream scheduling framework quartz or easy scheduler. The author believes that software engineering sometimes needs to weigh the trade-offs. What should be abandoned should be abandoned decisively, otherwise it will always carry a heavy load. How far can it go?
reference material:
appendix
ScheduleTaskAssistant:
@requiredArgsConstructor
@Component
public class ScheduleTaskAssistant {
/**
* 5秒
*/
public static final long DEFAULT_WAIT_TIME = 5L;
/**
* 30秒
*/
public static final long DEFAULT_LEAVE_TIME = 30L;
private final DistributedLockFactory distributedLockFactory;
/**
* 在分布式锁中执行
*
* @param waitTime 锁等着时间
* @param leaveTime 锁持有时间
* @param timeUnit 时间单位
* @param lockKey 锁的key
* @param task 任务对象
*/
public void executeInDistributedLock(long waitTime,long leaveTime,TimeUnit timeUnit,String lockKey,Runnable task) {
DistributedLock lock = distributedLockFactory.dl(lockKey);
boolean tryLock = lock.tryLock(waitTime,leaveTime,timeUnit);
if (tryLock) {
try {
long waitTimeMillis = timeUnit.toMillis(waitTime);
long start = System.currentTimeMillis();
task.run();
long end = System.currentTimeMillis();
long cost = end - start;
// 预防锁过早释放
if (cost < waitTimeMillis) {
Sleeper.X.sleep(waitTimeMillis - cost);
}
} finally {
lock.unlock();
}
}
}
/**
* 在分布式锁中执行 - 使用默认时间
*
* @param lockKey 锁的key
* @param task 任务对象
*/
public void executeInDistributedLock(String lockKey,Runnable task) {
executeInDistributedLock(DEFAULT_WAIT_TIME,DEFAULT_LEAVE_TIME,TimeUnit.SECONDS,lockKey,task);
}
}
RedissonDistributedLock:
@Slf4j
public class RedissonDistributedLock implements DistributedLock {
private final RedissonClient redissonClient;
private final String lockPath;
private final RLock internalLock;
RedissonDistributedLock(RedissonClient redissonClient,String lockPath) {
this.redissonClient = redissonClient;
this.lockPath = lockPath;
this.internalLock = initInternalLock();
}
private RLock initInternalLock() {
return redissonClient.getLock(lockPath);
}
@Override
public boolean isLock() {
return internalLock.isLocked();
}
@Override
public boolean isHeldByCurrentThread() {
return internalLock.isHeldByCurrentThread();
}
@Override
public void lock(long leaseTime,TimeUnit unit) {
internalLock.lock(leaseTime,unit);
}
@Override
public boolean tryLock(long waitTime,long leaseTime,TimeUnit unit) {
try {
return internalLock.tryLock(waitTime,leaseTime,unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s",lockPath),e);
}
}
@Override
public void unlock() {
try {
internalLock.unlock();
} catch (IllegalMonitorStateException ex) {
log.warn("Unlock path:{} error for thread status change in concurrency",lockPath,ex);
}
}
}
(it's a bit funny to finish c-7-d e-a-20200324 in this article. The author found that it's better to use an existing industrial scheduler for task persistence, so he made a lightweight package based on quartz, wrote a background management interface, and listened to the next chapter.)
The official account of Technology (Throwable Digest), which is not regularly pushed to the original technical article (never copied or copied):