Java Concurrent Programming – extend callable future

premise

I've been watching the JUC thread pool java util. concurrent. ThreadPoolExecutor source code implementation, which learned about Java util. concurrent. Implementation principle of future. From the current Java util. concurrent. In terms of the implementation of future, although asynchronous task submission is implemented, the process of obtaining task results needs to actively call future #get() or future #get (long timeout, timeunit unit). The former is blocked, and the latter may need polling when the execution time of asynchronous task is uncertain. These two situations are a little contrary to the original intention of asynchronous call. Therefore, the author wants to expand the future that supports (listens) callback on the premise of combining the implementation principle of future, and refers to guava's enhanced listenablefuture. The JDK used in this paper is jdk11, and other versions may not be suitable.

Briefly analyze the implementation principle of future

Virtual example deduction

When designing the JUC thread pool, concurrent master Doug lea provided a top-level executor interface executor:

public interface Executor {

    void execute(Runnable command);
}    

In fact, the method executor #execute () defined here is the core interface of the whole thread pool system, that is, the core thread defined by ThreadPoolExecutor Additional threads (maximum thread capacity of thread pool - number of core threads) are created lazily when the interface submits tasks, that is, the extended functions of the executorservice interface are extended based on executor #execute(). The executor #execute () method simply puts the runnable object of the task instance into the thread pool and allocates an appropriate thread for execution. However, since the return value of the method is of void type, we cannot perceive when the task is completed. At this time, the runnable task instance needs to be wrapped (pseudo code + pseudo logic below):

// 下面这个Wrapper和Status类是笔者虚构出来
@requiredArgsConstructor
class Wrapper implements Runnable{

    private final Runnable target;
    private Status status = Status.of("初始化");

    @Override
    public void run(){
        try{
           target.run();
           status = Status.of("执行成功");
        }catch(Throwable t){
           status = Status.of("执行异常"); 
        }
    }
}

We only need to put the new wrapper (original runnable instance) into the thread pool for execution, so we can know the execution status of asynchronous tasks through the defined status record variable, And when the execution is completed (including normal execution and abnormal execution). This only solves the problem of obtaining the status of task execution, but the return value of the executor #execute() method is of void type, which makes it impossible to callback the execution result of the runnable object. At this time, you need to define an interface that can call back the execution result. In fact, there is an existing interface callable:

@FunctionalInterface
public interface Callable<V> {

    V call() throws Exception;
}    

There is a problem here: since executor #execute() only receives runnable parameters, we need to adapt the callable interface to the runnable interface. At this time, we can simply delegate:

@requiredArgsConstructor
class Wrapper implements Runnable{

    private final Callable callable;
    private Status status = Status.of("初始化");
    @Getter
    private Object outcome;

    @Override
    public void run(){
        try{
           outcome = callable.call();
           status = Status.of("执行成功");
        }catch(Throwable t){
           status = Status.of("执行异常"); 
           outcome = t;
        }
    }
}

Here, the callable instance is directly delegated to the wrapper, which implements the runnable interface, and the execution results can be directly stored in the object outcome of the defined object type. When we perceive that the execution state has ended, we can extract the execution results from the outcome.

Implementation of future

The above summary only makes a relatively reasonable virtual deduction for the implementation of future. In fact, runnablefuture is a common composite interface in JUC, which implements runnable and future at the same time:

public interface RunnableFuture<V> extends Runnable,Future<V> {
    
    void run();
}

The fictional wrapper class mentioned in the previous section is implemented in Java. Com in JUC util. concurrent. Futuretask is the adapter of callable and runnable. Futuretask implements the runnablefuture interface:

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile,protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    
    // 省略其他代码
}    

Note that the core attribute state indicates the execution status, and the outcome carries the execution results. Next, look at the method executorservice#submit (), which submits callable tasks:

public interface ExecutorService extends Executor {

    // 省略其他接口方法

    <T> Future<T> submit(Callable<T> task);
}    

When we submit a callable task through the executorservice#submit () method above, we actually do the following steps:

If we need to get the results, we can get them through future #get() or future #get (long timeout, timeunit). When calling these two methods, refer to the method implementation in futuretask. The steps are as follows:

In fact, after analyzing so many, the author wants to point out that after the callable task is submitted to the thread pool for execution (including normal execution and abnormal execution), it will call back the hook method futuretask #done(). This is the theoretical basis for our extensible future.

Extend callable future

First do a coding implementation, and then simply test its function.

Coding implementation

First define a sub interface listenablefuture of the future interface, which is used to add callbacks that can be listened to:

public interface ListenableFuture<V> extends Future<V> {

    void addCallback(ListenableFutureCallback<V> callback,Executor executor);
}

Listenablefuturecallback is a functional callback interface:

@FunctionalInterface
public interface ListenableFutureCallback<V> {

    void callback(V value,Throwable throwable);
}

For listenablefuturecallback, the callback result value and throwable are mutually exclusive. After normal execution, value will be the execution result value, and throwable is null; When the exception is executed, value will be null and throwable will be the exception instance thrown. If you are more accustomed to handling the results of normal execution and abnormal execution separately, the listenablefuturecallback can be defined as follows:

public interface ListenableFutureCallback<V> {

    void onSuccess(V value);

    void onError(Throwable throwable);
}

Next, define the listenableexecutorservice interface and inherit the executorservice interface:

public interface ListenableExecutorService extends ExecutorService {

    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable);

    /**
     * 定义这个方法是因为有些时候由于任务执行时间非常短,有可能通过返回的ListenableFuture实例添加回调之前已经执行完毕,因此可以支持显式传入回调
     *
     * @param callable  callable
     * @param callbacks callbacks
     * @param executor  executor
     * @return ListenableFuture
     */
    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable,List<ListenableFutureCallback<T>> callbacks,Executor executor);
}

Then add an execution unit adapter listenablefuturecallbackrunnable to carry the call triggered by each callback (implement the runnable interface to support asynchronous execution):

@requiredArgsConstructor
public class ListenableFutureCallbackRunnable<V> implements Runnable {

    private final ListenableFutureCallback<V> callback;
    private final V value;
    private final Throwable throwable;

    @Override
    public void run() {
        callback.callback(value,throwable);
    }
}

Next, you need to define a subclass listenablefuturetask of futuretask. The core logic is to override the futuretask #done() method to trigger the callback:

// ListenableFutureTask
public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {

    private final List<Execution<V>> executions = new ArrayList<>();

    public ListenableFutureTask(Callable<V> callable) {
        super(callable);
    }

    public ListenableFutureTask(Runnable runnable,V result) {
        super(runnable,result);
    }

    public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) {
        return new ListenableFutureTask<>(callable);
    }

    @Override
    protected void done() {
        Iterator<Execution<V>> iterator = executions.iterator();
        Throwable throwable = null;
        V value = null;
        try {
            value = get();
        } catch (Throwable t) {
            throwable = t;
        }
        while (iterator.hasNext()) {
            Execution<V> execution = iterator.next();
            ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(),value,throwable);
            // 异步回调
            if (null != execution.getExecutor()) {
                execution.getExecutor().execute(callbackRunnable);
            } else {
                // 同步回调
                callbackRunnable.run();
            }
        }
    }

    @Override
    public void addCallback(ListenableFutureCallback<V> callback,Executor executor) {
        Execution<V> execution = new Execution<>();
        execution.setCallback(callback);
        execution.setExecutor(executor);
        executions.add(execution);
    }
}

// Execution - 承载每个回调实例和对应的Executor,Executor实例为null则进行同步回调
@Data
public class Execution <V>{

    private Executor executor;
    private ListenableFutureCallback<V> callback;
}

The last step is to write the thread pool listenablethreadpoolexecutor, which inherits from ThreadPoolExecutor and implements the listenableexecutorservice interface:

public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService {

    public ListenableThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);
    }

    public ListenableThreadPoolExecutor(int corePoolSize,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        super(corePoolSize,workQueue,threadFactory);
    }

    public ListenableThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
        super(corePoolSize,handler);
    }

    public ListenableThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,threadFactory,handler);
    }

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) {
        if (null == callable) {
            throw new IllegalArgumentException("callable");
        }
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        execute(listenableFutureTask);
        return listenableFutureTask;
    }

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable,Executor executor) {
        if (null == callable) {
            throw new IllegalArgumentException("callable");
        }
        if (null == callbacks) {
            throw new IllegalArgumentException("callbacks");
        }
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        for (ListenableFutureCallback<T> callback : callbacks) {
            listenableFutureTask.addCallback(callback,executor);
        }
        execute(listenableFutureTask);
        return listenableFutureTask;
    }
}

test

Introduce JUnit and write test classes as follows:

public class ListenableFutureTest {

    private static ListenableExecutorService EXECUTOR;
    private static Executor E;

    @BeforeClass
    public static void before() {
        EXECUTOR = new ListenableThreadPoolExecutor(1,3,TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),new ThreadFactory() {

            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName(String.format("ListenableWorker-%d",counter.getAndIncrement()));
                return thread;
            }
        });
        E = Executors.newFixedThreadPool(3);
    }

    @Test
    public void testListenableFuture1() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            return "message";
        });
        future.addCallback((v,t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s",v,t));
        },null);
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture2() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            throw new RuntimeException("exception");
        });
        future.addCallback((v,null);
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture3() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            return "message";
        });
        future.addCallback((v,E);
        System.out.println("testListenableFuture3 end...");
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture4() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            throw new RuntimeException("exception");
        });
        future.addCallback((v,E);
        System.out.println("testListenableFuture4 end...");
        Thread.sleep(2000);
    }
}

Execution results:

// testListenableFuture1
Value = message,Throwable = null

// testListenableFuture2
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception

// testListenableFuture3
testListenableFuture3 end...
Value = message,Throwable = null

// testListenableFuture4
testListenableFuture4 end...
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception

Consistent with the expected results, note that if the callable execution throws an exception and the exception is wrapped as executionexception, you need to call throwable#getcause() to get the original exception instance.

Summary

By understanding the implementation principle of ThreadPoolExecutor and future, this paper makes a simple extension to make asynchronous submission task more elegant and simple. While strengthening the hands-on ability, it can also deepen some understanding of concurrent programming. Of course, this article only provides a very simple implementation. In fact, the author also thinks of more perfect functions, such as monitoring the time-consuming callback processing, grouping the callback label for execution, and so on. Wait until there is a need for the scene.

Here are some insights from the process:

Personal blog

(c-1-d, e-a-20190702)

The official account of Technology (Throwable Digest), which is not regularly pushed to the original technical article (never copied or copied):

Entertainment official account ("sand sculpture"), select interesting sand sculptures, videos and videos, push them to relieve life and work stress.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>