The principle of future and futuretask in Java and its combination with thread pool
Future and future in Java are usually used together with thread pool to obtain the return value after thread pool return execution. Let's assume that we build a thread pool es through the executors factory method. ES has two ways to execute a task. One is to execute es Execute (runnable). In this case, there is no return value; Another case is to execute es Submit (runnale) or es Submit (callable), which returns an object of Future, and then calls Future's get () to get the return value.
Future
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException; V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException; }
Future is an interface that provides us with methods to detect whether the current task has ended, and can wait for the task to end and get a result, By calling the get () method of future, a result value can be returned after the task is completed. If the work is not completed, the current thread will be blocked until the task is completed. We can stop a task by calling the cancel () method. If the task has been stopped, cancel () method will return true; if the task has been completed or stopped or the task cannot be stopped, cancel () will return false. After a task is successfully stopped, it cannot be executed again. Isdone () and iscancel () methods can judge whether the current work is completed and cancelled.
There are several methods in the thread pool:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task,null); execute(ftask); return ftask; }
public <T> Future<T> submit(Runnable task,T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task,result); execute(ftask); return ftask; }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
Both return a future object. After careful inspection, it is found that all methods eventually convert runnable or callable into a runnablefuture object, which is an interface that inherits runnable and future at the same time
public interface RunnableFuture<V> extends Runnable,Future<V> { void run(); }
Then we call the executor (runnable) method, and we will talk about the concrete implementation of executor (runnable) later. Finally, a runnablefuture object is returned. The runnablefuture interface has a specific implementation class, which is the futuretask we will talk about next.
FutureTask
public class FutureTask<V> implements RunnableFuture<V>
Futuretask implements the interface of runnablefuture. Since we know that the final return is a futuretask object ftask, and we can use ftask Get () can be used to get the return value of execute (task). How to implement this process?? This is also what this article is about.
We can first guess its implementation process. First, runnable's run () has no return value, so when es When the parameter of submit() has only one runnable object, use ftask Get () also gets a null value. When the parameter still has a result, it returns the result; If the parameter is a callable object, the callable call () has a return value. At the same time, this call () method will be called in the run () method of the converted runable object ftask, and then assign its return value to a global variable. Finally, get this value in the get () method of ftask. Guess right? Let's look directly at the source code.
Method to convert runnable object to runnablefuture:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,T value) { return new FutureTask<T>(runnable,value); }
public FutureTask(Runnable runnable,V result) { this.callable = Executors.callable(runnable,result); this.state = NEW; // ensure visibility of callable }
Executors::callable()
public static <T> Callable<T> callable(Runnable task,T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task,result); }
Internal class runnableadapter of executors
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task,T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
Method to convert callable object to runnablefuture:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
Next, let's look at the execution process of execute (runnbale):
The final implementation of execute (runnable) is in ThreadPoolExecutor. Basically, all thread pools are constructed by passing different parameters through the construction method of ThreadPoolExecutor.
ThreadPoolExecutor::executor(runnable) :
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command,true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null,false); } else if (!addWorker(command,false)) reject(command); }
As shown above, this process is divided into three steps:
Step 1:
If the number of threads in the current thread pool is less than the number of core threads, addworker will be called to check the running status and the number of running threads. False will be returned to prevent the wrong addition of threads, and then the current task will be executed.
Step 2:
Otherwise, when the number of threads in the current thread pool is greater than the number of core threads, we still need to judge whether a new thread needs to be added to execute the task, because the existing threads may be idle after the task is executed, and can be reused directly at this time. Otherwise, create a new thread to perform this task.
Step 3:
If you cannot add new tasks, reject them.
Executing execute (runnable) will eventually call back the run () method of runnable, that is, the run () method of futuretask object ftask. The source code is as follows:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
Get the return value by executing result = c.call () and then set (result). Therefore, the value obtained by the get () method is the result.
The above is the whole content of this article. I hope it will help you in your study, and I hope you will support us a lot.