Deeply understand the implementation principle of Java programming thread pool
In the previous article, we created a thread when using threads, which is very easy to implement, but there will be a problem:
If there are a large number of concurrent threads and each thread executes a task for a short time, the efficiency of the system will be greatly reduced because it takes time to create and destroy threads frequently.
So is there a way to make threads reusable, that is, after executing a task, they can continue to execute other tasks without being destroyed?
In Java, this effect can be achieved through thread pool. Today, let's explain the thread pool of Java in detail. First, we start with the method in the core ThreadPoolExecutor class, then describe its implementation principle, then give its use examples, and finally discuss how to reasonably configure the size of the thread pool.
The following is an outline of the contents of this article:
I ThreadPoolExecutor class in Java
II Deeply analyze the implementation principle of thread pool
III Use example
IV How to reasonably configure the size of thread pool
If there is anything wrong, please forgive me and welcome criticism and correction.
I ThreadPoolExecutor class in Java
java. uitl. concurrent. The ThreadPoolExecutor class is the core class in the thread pool. Therefore, if you want to thoroughly understand the thread pool in Java, you must first understand this class. Let's take a look at the specific implementation source code of the ThreadPoolExecutor class.
Four constructors are provided in the ThreadPoolExecutor class:
As can be seen from the above code, ThreadPoolExecutor inherits the abstractexecutorservice class and provides four constructors. In fact, by observing the specific implementation of the source code of each constructor, it is found that the first three constructors are the initialization work of the fourth constructor called.
The meanings of the parameters in the constructor are explained below:
Corepoolsize: the size of the core pool. This parameter is closely related to the implementation principle of the thread pool described later. After a thread pool is created, by default, there are no threads in the thread pool. Instead, a thread is created to execute a task when a task arrives, unless the prestartallcorethreads() or prestartcorethread() methods are called. From the names of these two methods, we can see that they mean pre created threads, That is, create a corepoolsize thread or a thread before the task arrives. By default, after the thread pool is created, the number of threads in the thread pool is 0. When a task comes, a thread will be created to execute the task. When the number of threads in the thread pool reaches the corepoolsize, the arriving task will be placed in the cache queue; Maximumpoolsize: the maximum number of threads in the thread pool. This parameter is also a very important parameter. It indicates the maximum number of threads that can be created in the thread pool; Keepalivetime: indicates how long the thread will terminate if it has no task to execute. By default, keepalivetime works only when the number of threads in the thread pool is greater than corepoolsize, until the number of threads in the thread pool is not greater than corepoolsize, that is, when the number of threads in the thread pool is greater than corepoolsize, if a thread is idle for keepalivetime, it will terminate until the number of threads in the thread pool does not exceed corepoolsize. However, if the allowcorethreadtimeout (Boolean) method is called and the number of threads in the thread pool is not greater than corepoolsize, the keepalivetime parameter will also work until the number of threads in the thread pool is 0; Unit: the time unit of the parameter keepalivetime. There are seven values. In the timeunit class, there are seven static attributes:
Workqueue: a blocking queue used to store tasks waiting to be executed. The selection of this parameter is also very important and will have a significant impact on the running process of the thread pool. Generally speaking, the blocking queue here has the following options:
Arrayblockingqueue and priorityblockingqueue are rarely used, and linkedblockingqueue and synchronous are generally used. The queuing policy of thread pool is related to BlockingQueue.
Threadfactory: thread factory, mainly used to create threads; Handler: indicates the policy when processing a task is rejected. There are four values:
The relationship between the configuration of specific parameters and thread pool will be described in the next section.
From the code of ThreadPoolExecutor class given above, we can know that ThreadPoolExecutor inherits abstractexecutorservice. Let's take a look at the implementation of abstractexecutorservice:
Abstractexecutorservice is an abstract class that implements the executorservice interface.
Let's next look at the implementation of the executorservice interface:
Executorservice inherits the executor interface. Let's take a look at the implementation of the executor interface:
By now, you should understand the relationship among ThreadPoolExecutor, abstractexecutorservice, executorservice and executor.
Executor is a top-level interface, in which only one method execute (runnable) is declared, the return value is void, and the parameter is of runnable type. It can be understood from the literal meaning that it is used to execute the passed in task;
Then, the executorservice interface inherits the executor interface and declares some methods: submit, invokeall, invokeany, shutdown, etc;
The abstract class abstractexecutorservice implements the executorservice interface and basically implements all the methods declared in executorservice;
ThreadPoolExecutor then inherits the class abstractexecutorservice.
There are several very important methods in the ThreadPoolExecutor class:
The execute () method is actually a method declared in the executor. It is specifically implemented in ThreadPoolExecutor. This method is the core method of ThreadPoolExecutor. Through this method, you can submit a task to the thread pool for execution.
Submit () method is a method declared in executorservice. Abstractexecutorservice already has a specific implementation. It is not rewritten in ThreadPoolExecutor. This method is also used to submit tasks to the thread pool, but it is different from execute () method. It can return the results of task execution. See the implementation of submit () method, You will find that it is actually the called execute () method, but it uses future to obtain the task execution results (future related content will be described in the next article).
Shutdown () and shutdown now () are used to close the thread pool.
There are many other ways:
For example: getqueue (), getpoolsize (), getactivecount (), getcompletedtaskcount (), and other methods to obtain properties related to thread pool. Interested friends can consult the API by themselves.
II Deeply analyze the implementation principle of thread pool
In the previous section, we introduced ThreadPoolExecutor macroscopically. Next, we will analyze the specific implementation principle of thread pool from the following aspects:
1. Thread pool status
2. Execution of tasks
3. Thread initialization in thread pool
4. Task cache queue and queuing strategy
5. Task rejection policy
6. Thread pool shutdown
7. Dynamic adjustment of thread pool capacity
1. Thread pool status
A volatile variable is defined in the ThreadPoolExecutor, and several static final variables are defined to represent each state of the thread pool:
Runstate indicates the state of the current thread pool. It is a volatile variable to ensure the visibility between threads;
The following static final variables represent several possible values of runstate.
After creating a thread pool, the thread pool is in running state at the beginning;
If the shutdown () method is called, the thread pool is in the shutdown state. At this time, the thread pool cannot accept new tasks, and it will wait for all tasks to be executed;
If the shutdown now () method is called, the thread pool is in the stop state. At this time, the thread pool cannot accept new tasks and will try to terminate the executing tasks;
When the thread pool is in shutdown or stop state, all working threads have been destroyed, the task cache queue has been emptied or the execution is completed, the thread pool is set to terminated state.
2. Implementation of the mandate
Before understanding the whole process from submitting a task to the thread pool to the completion of task execution, let's take a look at some other important member variables in the ThreadPoolExecutor class:
The role of each variable has been indicated. Here, we will focus on explaining the three variables: corepoolsize, maximumpoolsize and largestpoolsize.
Corepoolsize is translated into the core pool size in many places. In fact, I understand this is the size of the thread pool. Take a simple example:
If there is a factory, there are 10 workers in the factory, and each worker can only do one task at the same time.
Therefore, as long as one of the 10 workers is idle, the task will be assigned to the idle workers;
When all 10 workers have tasks to do, if they still have tasks, they will queue up for them;
If the growth rate of the number of new tasks is much faster than that of workers, then the factory supervisor may want to take remedial measures, such as recruiting four temporary workers again;
Then the task is also assigned to the four temporary workers;
If the speed of 14 workers is still not enough, the factory supervisor may have to consider not accepting new tasks or abandoning some previous tasks.
When some of the 14 workers are idle and the growth rate of new tasks is relatively slow, the factory supervisor may consider quitting four temporary workers and maintaining only the original 10 workers. After all, it costs money to hire additional workers.
In this example, the corepoolsize is 10 and the maximumpoolsize is 14 (10 + 4).
In other words, corepoolsize is the size of the thread pool. In my opinion, maximumpoolsize is a remedy for the thread pool, that is, a remedy when the task volume is suddenly too large.
However, for ease of understanding, the core pool size is translated into the core pool size later in this article.
Largestpoolsize is just a variable used to record the maximum number of threads in the thread pool, which has nothing to do with the capacity of the thread pool.
Now let's get to the point and see what process the task has gone through from submission to final execution.
In the ThreadPoolExecutor class, the core task submission method is the execute() method. Although tasks can be submitted through submit, in fact, the final call in the submit method is the execute() method, so we only need to study the implementation principle of the execute() method:
The above code may not seem so easy to understand. Let's explain it sentence by sentence:
First, judge whether the submitted task command is null. If it is null, a null pointer exception will be thrown;
Then there is this sentence, which should be well understood:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
Because it is a or conditional operator, first calculate the value of the first half. If the current number of threads in the thread pool is not less than the size of the core pool, it will directly enter the following if statement block.
If the current number of threads in the thread pool is less than the size of the core pool, then the second half, that is, execution, is executed
addIfUnderCorePoolSize(command)
If the addifundercorepoolsize method returns false after execution, continue to execute the following if statement block, otherwise the whole method will be directly executed.
If the addifundercorepoolsize method returns false after execution, then judge:
if (runState == RUNNING && workQueue.offer(command))
If the current thread pool is running, put the task into the task cache queue; If the current thread pool is not running or the task fails to put into the cache queue, execute:
addIfUnderMaximumPoolSize(command)
If the addifundermaximumpoolsize method fails, execute the reject () method to reject the task.
Back to the front:
if (runState == RUNNING && workQueue.offer(command))
For the execution of this sentence, if the current thread pool is running and the task is successfully placed in the task cache queue, continue to judge:
if (runState != RUNNING || poolSize == 0)
This judgment is an emergency measure to prevent other threads from suddenly calling the shutdown or shutdown now method to close the thread pool while adding this task to the task cache queue. If so, execute:
ensureQueuedTaskHandled(command)
Emergency processing, as can be seen from the name, is to ensure that the tasks added to the task cache queue are processed.
Next, let's look at the implementation of two key methods: addifundercorepoolsize and addifundermaximumpoolsize:
This is the concrete implementation of the addifundercorepoolsize method. As can be seen from the name, its intention is to execute the method when it is lower than the core size. Let's look at the specific implementation. First, get the lock, because this involves the change of thread pool state. First, judge whether the number of threads in the current thread pool is less than the size of the core pool through the if statement. Some friends may have questions: haven't they already been judged in the execute () method, The addifundercorepoolsize method will be executed only when the current number of threads in the thread pool is less than the size of the core pool. Why continue to judge in this place? The reason is very simple. In the previous judgment process, there is no lock. Therefore, when the execute method judges, the poolsize may be less than the corepoolsize. After the judgment, the task is submitted to the thread pool in other threads, which may lead to the poolsize not less than the corepoolsize. Therefore, it is necessary to continue the judgment here. Then we will determine whether the state of the thread pool is RUNNING, and the reason is very simple, because it is possible to call shutdown or shutdownNow methods in other threads. Then there is execution
t = addThread(firstTask);
This method is also very critical. The parameters passed in are the submitted task, and the return value is thread type. Then judge whether T is empty below. If it is empty, it indicates that the thread creation fails (that is, poolsize > = corepoolsize or runstate is not equal to running). Otherwise, call t.start() method to start the thread.
Let's take a look at the implementation of the addthread method:
In the addThread method, we first create a Worker object with the submitted task, then call the thread factory threadFactory to create a new thread T, assign the reference of thread t to the member variable thread of Worker object, and then go through workers.. Add (W) adds the worker object to the Workset.
Let's take a look at the implementation of the worker class:
It actually implements the runnable interface, so the above thread t = threadfactory newThread(w); The effect is basically the same as that of the following sentence:
Thread t = new Thread(w);
It is equivalent to passing in a runnable task and executing the runnable in the thread t.
Since the worker implements the runnable interface, the core method is the run () method:
It can be seen from the implementation of the run method that it first executes the task firsttask passed in through the constructor. After calling runtask() to execute the firsttask, it continues to get new tasks through gettask() in the while loop. Where to get them? Naturally, it is retrieved from the task cache queue. Gettask is a method in the ThreadPoolExecutor class, not in the worker class. The following is the implementation of gettask method:
In gettask, first judge the current thread pool state. If runstate is greater than shutdown (i.e. stop or terminated), null will be returned directly.
If runstate is shutdown or running, the task is fetched from the task cache queue.
If the number of threads in the current thread pool is greater than the core pool size corepoolsize or it is allowed to set the idle survival time for threads in the core pool, call poll (time, timeunit) to get the task. This method will wait for a certain time. If the task cannot be obtained, it will return null.
Then judge whether the obtained task R is null. If it is null, judge whether the current worker can exit by calling the workercanexit() method. Let's take a look at the implementation of workercanexit():
That is, if the thread pool is in the stop state, or the task queue is empty, or the idle lifetime is allowed to be set for the core pool thread, and the number of threads is greater than 1, the worker is allowed to exit. If the worker is allowed to exit, call interruptidleworkers() to interrupt the idle worker. Let's take a look at the implementation of interruptidleworkers():
It can be seen from the implementation that it actually calls the worker's interruptifidle() method. In the worker's interruptifidle() method:
Here is a very clever design method. If we design the thread pool, there may be a task dispatch thread. When a thread is found to be idle, we will take a task from the task cache queue and give it to the idle thread for execution. However, this method is not adopted here, because it will additionally manage the task dispatching thread, which will virtually increase the difficulty and complexity. Here, the thread executing the task is directly asked to fetch the task from the task cache queue for execution.
Let's look at the implementation of addifundermaximumpoolsize method. The implementation idea of this method is very similar to that of addifundercorepoolsize method. The only difference is that addifundermaximumpoolsize method is executed when the number of threads in the process pool reaches the size of the core pool and adding tasks to the task queue fails:
See, actually, as like as two peas of addIfUnderCorePoolSize, the if statement is different from the poolSize < maximumPoolSize.
Here, most friends should have a basic understanding of the whole process from the task submitted to the thread pool to the execution. The following is a summary:
1) First, the meaning of corepoolsize and maximumpoolsize should be clear;
2) Secondly, you should know what role worker is used to play;
3) To know the processing strategy after a task is submitted to the thread pool, here are four main points:
If the number of threads in the current thread pool is less than corepoolsize, a thread will be created to execute each task; If the number of threads in the current thread pool > = corepoolsize, each task will try to add it to the task cache queue. If the addition is successful, the task will wait for the idle thread to take it out for execution; If the addition fails (generally speaking, if the task cache queue is full), it will try to create a new thread to execute the task; if the number of threads in the current thread pool reaches maximumpoolsize, it will adopt the task rejection policy for processing; if the number of threads in the thread pool is greater than corepoolsize, if the idle time of a thread exceeds keepalivetime, the thread will be terminated until it reaches the line The number of threads in the process pool is not greater than corepoolsize; If it is allowed to set the survival time for threads in the core pool, threads in the core pool will be terminated if their idle time exceeds keepalivetime.
3. Thread initialization in thread pool
By default, after the thread pool is created, there are no threads in the thread pool, and the threads will not be created until the task is submitted.
In practice, if you need to create threads immediately after the thread pool is created, you can do it in the following two ways:
Prestartcorethread(): initializes a core thread; Prestartallcorethreads(): initialize all core threads. The following are the implementations of these two methods:
Note that the parameter passed in above is null. According to the analysis in Section 2, if the parameter passed in is null, the last execution thread will block in the gettask method
That is, there are tasks in the waiting task queue.
4. Task cache queue and queuing strategy
We mentioned the task cache queue many times earlier, that is, workqueue, which is used to store tasks waiting to be executed.
The type of workqueue is BlockingQueue < runnable >, which can generally take the following three types:
1) Arrayblockingqueue: array based first in first out queue. The size must be specified when creating this queue;
2) Linkedblockingqueue: a linked list based first in first out queue. If the queue size is not specified when it is created, it defaults to integer MAX_ VALUE;
3) Synchronous queue: this queue is special. It will not save the submitted tasks, but will directly create a new thread to execute the new tasks.
5. Task rejection policy
When the task cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumpoolsize, the task rejection policy will be adopted if there are still tasks coming. Generally, there are four strategies:
6. Thread pool shutdown
ThreadPoolExecutor provides two methods for closing the thread pool, namely shutdown() and shutdown now(), where:
Shutdown(): the thread pool will not be terminated immediately, but will not be terminated until all the tasks in the task cache queue have been executed, but will no longer accept new tasks. Shutdown now(): immediately terminate the thread pool, try to interrupt the executing tasks, empty the task cache queue, and return the unexecuted tasks 7 Dynamic adjustment of thread pool capacity
ThreadPoolExecutor provides methods to dynamically adjust the size of thread pool capacity: setcorepoolsize() and setmaximumpoolsize(),
Setcorepoolsize: sets the core pool size setmaximumpoolsize: sets the maximum number and size of threads that can be created by the thread pool. When the above parameters increase from small to large, ThreadPoolExecutor assigns a thread value, and may immediately create a new thread to execute a task.
III Use example
We discussed the implementation principle of thread pool earlier. In this section, let's take a look at its specific use:
Execution results:
It can be seen from the execution results that when the number of threads in the thread pool is greater than 5, the task will be put into the task cache queue. When the task cache queue is full, a new thread will be created. If you change the for loop to execute 20 tasks in the above program, you will throw a task rejection exception.
However, in Java doc, we are not encouraged to directly use ThreadPoolExecutor, but to use several static methods provided in executors class to create thread pool:
The following is the specific implementation of these three static methods;
From their specific implementation, they actually call ThreadPoolExecutor, but the parameters have been configured.
The values of corepoolsize and maximumpoolsize of the thread pool created by newfixedthreadpool are equal, and it uses linkedblockingqueue;
Newsinglethreadexecution sets both corepoolsize and maximumpoolsize to 1, and also uses linkedblockingqueue;
Newcachedthreadpool set corepoolsize to 0 and maximumpoolsize to integer MAX_ Value, the synchronized queue used, that is, when a task comes, a thread is created to run. When the thread is idle for more than 60 seconds, the thread is destroyed.
In practice, if the three static methods provided by executors can meet the requirements, try to use the three methods provided by executors, because it is a little troublesome to manually configure the parameters of ThreadPoolExecutor, which should be configured according to the type and quantity of actual tasks.
In addition, if the ThreadPoolExecutor fails to meet the requirements, you can inherit the ThreadPoolExecutor class and override it.
IV How to reasonably configure the size of thread pool
This section discusses an important topic: how to reasonably configure the thread pool size. It is for reference only.
Generally, the thread pool size needs to be configured according to the type of task:
If it is a CPU intensive task, you need to squeeze the CPU as much as possible. The reference value can be set to ncpu + 1
For IO intensive tasks, the reference value can be set to 2 * ncpu
Of course, this is only a reference value. The specific settings need to be adjusted according to the actual situation. For example, you can first set the thread pool size to the reference value, and then observe the task operation, system load and resource utilization to make appropriate adjustments.
summary
The above is all about the in-depth understanding of the implementation principle of Java programming thread pool. I hope it will be helpful to you. If you have any questions, you can leave a message at any time. Xiaobian will reply to you in time. Thank you for your support!