Java – reader #lines() is very parallelized due to the non configurable batch size policy in its splicer

When the stream source is reader, I can't achieve good parallelization of stream processing Running the following code on a quad core CPU, I observed that three cores were used first, then suddenly dropped to two, and then one core The overall CPU utilization is about 50%

Note the characteristics of the following examples:

>Only 6000 lines; > About 20ms for each row; > The whole process takes about a minute

This means that all the pressure is on the CPU and the I / O is very small This example is a sitting duck with automatic parallelization

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...    

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createinput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       cpu time: %.2f s\n",cpuTime/SECONDS.toNanos(1));
    System.out.format("      Real time: %.2f s\n",realTime/SECONDS.toNanos(1));
    System.out.format("cpu utilization: %.2f%%",100.0*cpuTime/realTime/cores);
  }

  private static String processLine(String line) {
    final long localStart = System.nanoTime();
    double ret = 0;
    for (int i = 0; i < line.length(); i++)
      for (int j = 0; j < line.length(); j++)
        ret += Math.pow(line.charAt(i),line.charAt(j)/32.0);
    final long took = System.nanoTime()-localStart;
    totalTime.getAndAdd(took);
    return NANOSECONDS.toMillis(took) + " " + ret;
  }

  private static Path createinput() throws IOException {
    final Path inputPath = Paths.get("input.txt");
    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
      for (int i = 0; i < 6_000; i++) {
        final String text = String.valueOf(System.nanoTime());
        for (int j = 0; j < 25; j++) w.print(text);
        w.println();
      }
    }
    return inputPath;
  }
}

My typical output:

Cores: 4
       cpu time: 110.23 s
      Real time: 53.60 s
cpu utilization: 51.41%

For comparison, if I use a slightly modified variant, I first collect it in the list and then process it:

Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
  .forEach(w::println);

I get this typical output:

Cores: 4
       cpu time: 138.43 s
      Real time: 35.00 s
cpu utilization: 98.87%

What can explain this effect and how to solve this problem?

Note that I first observed this on the reader of the servlet input stream, so it is not specific to FileReader

Solution

Here is the answer in splitters In the source code of iteratorsplitter, BufferedReader #lines() uses the following code:

@Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations,across combinations of #elements vs #cores,* whether or not either are kNown.  We generate
         * O(sqrt(#elements)) splits,allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a,j,characteristics);
        }
        return null;
    }

It is also worth noting that the constant:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

So in my example, I use 6000 elements. Because the batch size is 1024, I only need batch This explains my observation that three cores are used initially, and when two small batches are completed, they are discarded In the meantime, I tried a modified example with 60000 elements, and then I got almost 100% CPU utilization

To solve my problem, I have developed the following code, which allows me to convert any existing stream into a splitter #trysplit and divide it into batches of a specified size The easiest way to use it from my question is as follows:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(),20)

At a lower level, the following class is a splitter wrapper that changes the trysplit behavior of the wrapped splitter and leaves other aspects unchanged

import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,long est,int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,int batchSize) {
    this(toWrap,toWrap.estimateSize(),batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in,int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(),batchSize),true);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a,characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hascharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>