Java – understand the main loop in foreachtask of streams API
It seems that the core of parallelization of Java streams is foreachtask Understanding its logic seems crucial to obtaining the mental model necessary to anticipate the concurrent behavior of client code written for the streams API However, I found that my expectations contradicted my actual behavior
For reference, here is the key compute () method (Java / util / streams / foreachops. Java: 253):
public void compute() { Spliterator<S> rightSplit = spliterator,leftSplit; long sizeEstimate = rightSplit.estimateSize(),sizeThreshold; if ((sizeThreshold = targetSize) == 0L) targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKNown(helper.getStreamAndOpFlags()); boolean forkRight = false; Sink<S> taskSink = sink; ForEachTask<S,T> task = this; while (!isShortCircuit || !taskSink.cancellationRequested()) { if (sizeEstimate <= sizeThreshold || (leftSplit = rightSplit.trySplit()) == null) { task.helper.copyInto(taskSink,rightSplit); break; } ForEachTask<S,T> leftTask = new ForEachTask<>(task,leftSplit); task.addToPendingCount(1); ForEachTask<S,T> taskToFork; if (forkRight) { forkRight = false; rightSplit = leftSplit; taskToFork = task; task = leftTask; } else { forkRight = true; taskToFork = leftTask; } taskToFork.fork(); sizeEstimate = rightSplit.estimateSize(); } task.spliterator = null; task.propagateCompletion(); }
In the high-level description, the main loop continuously decomposes the separator, alternately removes the processing of blocks and inline processing until the divider refuses further segmentation or the remaining size is lower than the calculated threshold
Now consider the above algorithm, in the case of undefined flow, in which the whole is not divided into roughly equal half; Instead, blocks of a predetermined size are repeatedly taken out from the head of the flow In this case, the "recommended target size" of the block is abnormally large, which mainly means that the block will not be re divided into smaller blocks
Therefore, the algorithm seems to alternately separate a block and then an inline process If each chunk takes the same time to process, no more than two cores should be used However, the actual behavior is that all four cores on my machine are occupied Obviously, I lack an important problem in using this algorithm
I'm missing what?
Appendix: test code
This is a stand-alone code that can be used to test the behavior of the subject of this problem:
package test; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static test.FixedBatchSpliteratorWrapper.withFixedSplits; import java.io.IOException; import java.io.PrintWriter; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; public class Parallelization { static final AtomicLong totalTime = new AtomicLong(); static final ExecutorService pool = Executors.newFixedThreadPool(4); public static void main(String[] args) throws IOException { final long start = System.nanoTime(); final Path inputPath = createinput(); System.out.println("Start processing"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) { withFixedSplits(Files.newBufferedReader(inputPath).lines(),200).map(Parallelization::processLine) .forEach(w::println); } final double cpuTime = totalTime.get(),realTime = System.nanoTime() - start; final int cores = Runtime.getRuntime().availableProcessors(); System.out.println(" Cores: " + cores); System.out.format(" cpu time: %.2f s\n",cpuTime / SECONDS.toNanos(1)); System.out.format(" Real time: %.2f s\n",realTime / SECONDS.toNanos(1)); System.out.format("cpu utilization: %.2f%%",100.0 * cpuTime / realTime / cores); } private static String processLine(String line) { final long localStart = System.nanoTime(); double ret = 0; for (int i = 0; i < line.length(); i++) for (int j = 0; j < line.length(); j++) ret += Math.pow(line.charAt(i),line.charAt(j) / 32.0); final long took = System.nanoTime() - localStart; totalTime.getAndAdd(took); return NANOSECONDS.toMillis(took) + " " + ret; } private static Path createinput() throws IOException { final Path inputPath = Paths.get("input.txt"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) { for (int i = 0; i < 6_000; i++) { final String text = String.valueOf(System.nanoTime()); for (int j = 0; j < 20; j++) w.print(text); w.println(); } } return inputPath; } }
package test; import static java.util.Spliterators.spliterator; import static java.util.stream.StreamSupport.stream; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> { private final Spliterator<T> spliterator; private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,long est,int batchSize) { final int c = toWrap.characteristics(); this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c; this.spliterator = toWrap; this.batchSize = batchSize; this.est = est; } public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,int batchSize) { this(toWrap,toWrap.estimateSize(),batchSize); } public static <T> Stream<T> withFixedSplits(Stream<T> in,int batchSize) { return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(),batchSize),true); } @Override public Spliterator<T> trySplit() { final HoldingConsumer<T> holder = new HoldingConsumer<>(); if (!spliterator.tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a,j,characteristics()); } @Override public boolean tryAdvance(Consumer<? super T> action) { return spliterator.tryAdvance(action); } @Override public void forEachRemaining(Consumer<? super T> action) { spliterator.forEachRemaining(action); } @Override public Comparator<? super T> getComparator() { if (hascharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer<T> implements Consumer<T> { Object value; @Override public void accept(T value) { this.value = value; } } }
Solution
Ironically, there is almost an answer to this question: because the "left" and "right" tasks are divided instead of inline processing, half the time is the correct task The complete flow is being shunted This means that the allocation of chunks is only slowed down a little (every once in a while), but obviously it happens