Use event listeners as Java 8 stream sources
Fast cleaning
Can a traditional event listener be refactored into a Java 8 stream so that the listener event becomes a stream source?
A long story
The client submits any job and listens for the result:
Client client = new JobClient() client.addTaskListener(this) client.submitJobAsync( new MultiTaskJob()) //returns void,important (see below) public void onTaskResult(TaskResult result){ if(result.isLastResult()) aggregateJobResults(result) else processResult(result) }
problem
For any job submitted, the client receives n results, but it does not know how many results it will receive (it uses islastresult() to determine when to stop and aggregate)
target
I want to refactor the listener into a "vendor" or something like that so that ontaskresult () is the stream source:
supplier<TaskResult> taskResultsupplier = () -> Stream.of( .. ) //onTaskResult() Feeds this .map(result -> { if(result.isLastResult()) //logic here });
Something like this; If I can do this without customers knowing how many results will be, I am golden; Now, submitjobasync () returns void. I want to keep this way, but I'm also open to options
Alternatives
After Tomasz nurkiewicz read a similar situation on completable futures, assuming that minor changes have been made to the customer, there are alternative options:
List<CompletableFuture<TaskResult>> taskFutures = client.submitJobAsync( new MultiTaskJob())
Here, the customer gets the list of completable futures < taskresult >, so we need to collect the results of futures upon completion:
//processes all task result futures List<TaskResult> = taskFutures.stream() .map(taskResult -> taskResult.thenApply(this::processResult)) .collect(Collectors.<TaskResult>toList());
This article also explains the use of completable future allOf(..) To perform the final processing, but only after all futures are completed (it is very smooth); This is where aggregation happens in my case No code can be shown here, although this article explains it well (I have a total of n00b streams, but if I get it to work, I will release the code: - D)
Solution
You can build a stream around taskresults Look at this example:
import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * Created for https://stackoverflow.com/q/27670421/1266906. */ public class AsyncToStream { public static void main(String[] args) { System.out.println("Unbuffered Test:"); AsyncTaskResultIterator<TaskResult> taskListener1 = new AsyncTaskResultIterator<>(); new TaskResultGenerator(taskListener1,5).start(); taskListener1.unbufferedStream().forEach(System.out::println); System.out.println("Buffered Test:"); AsyncTaskResultIterator<TaskResult> taskListener2 = new AsyncTaskResultIterator<>(); new TaskResultGenerator(taskListener2,5).start(); taskListener2.bufferedStream().forEach(System.out::println); } /** * This class wraps a sequence of TaskResults into an iterator upto the first TaskResult where {@code }isLastResult()} returns {@code true} */ public static class AsyncTaskResultIterator<T extends TaskResult> implements Iterator<T>,TaskListener<T> { /** * This acts as an asynchronous buffer so we can easily wait for the next TaskResult */ private final BlockingQueue<T> blockingQueue; /** * Becomes {@code true} once {@code TaskResult.isLastResult()} is received */ private boolean ended; public AsyncTaskResultIterator() { blockingQueue = new LinkedBlockingQueue<>(); } /** * Waits on a new TaskResult and returns it as long as the prevIoUs TaskResult did not specify {@code isLastResult()}. Afterwards no more elements can be retrieved. */ @Override public T next() { if (ended) { throw new NoSuchElementException(); } else { try { T next = blockingQueue.take(); ended = next.isLastResult(); return next; } catch (InterruptedException e) { throw new IllegalStateException("Could not retrieve next value",e); } } } @Override public boolean hasNext() { return !ended; } /** * Enqueue another TaskResult for retrieval */ @Override public void onTaskResult(T result) { if (ended) { throw new IllegalStateException("Already received a TaskResult with isLastResult() == true"); } try { blockingQueue.put(result); } catch (InterruptedException e) { throw new IllegalStateException("Could not enqueue next value",e); } } /** * Builds a Stream that acts upon the results just when they become available */ public Stream<T> unbufferedStream() { Spliterator<T> spliterator = Spliterators.spliteratorUnkNownSize(this,0); return StreamSupport.stream(spliterator,false); } /** * Buffers all results and builds a Stream around the results */ public Stream<T> bufferedStream() { Stream.Builder<T> builder = Stream.builder(); this.forEachRemaining(builder); return builder.build(); } } public static class TaskResultImpl implements TaskResult { private boolean lastResult; private String name; public TaskResultImpl(boolean lastResult,String name) { this.lastResult = lastResult; this.name = name; } @Override public String toString() { return "TaskResultImpl{" + "lastResult=" + lastResult + ",name='" + name + '\'' + '}'; } @Override public boolean isLastResult() { return lastResult; } } public static interface TaskListener<T extends TaskResult> { public void onTaskResult(T result); } public static interface TaskResult { boolean isLastResult(); } private static class TaskResultGenerator extends Thread { private final TaskListener<TaskResult> taskListener; private final int count; public TaskResultGenerator(TaskListener<TaskResult> taskListener,int count) { this.taskListener = taskListener; this.count = count; } @Override public void run() { try { for (int i = 1; i < count; i++) { Thread.sleep(200); taskListener.onTaskResult(new TaskResultImpl(false,String.valueOf(i))); } Thread.sleep(200); taskListener.onTaskResult(new TaskResultImpl(true,String.valueOf(count))); } catch (InterruptedException e) { e.printStackTrace(); } } } }
You didn't provide your taskresult and tasklistener definitions, so I made them up myself Asynctaskresultiterator is only applicable to a single taskresult sequence If taskresult next() with islastresult() = = true is not provided, unbuffered stream and buffered stream generation will wait endlessly