Java-8 – why do filters with side effects perform better than splitter based implementations?
How to skip even lines of a stream obtained from the files Lines, I follow the accepted answer method and implement my own filtereven () method based on splitter < T > Interface, for example:
public static <T> Stream<T> filterEven(Stream<T> src) { Spliterator<T> iter = src.spliterator(); AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE,Spliterator.ORDERED) { @Override public boolean tryAdvance(Consumer<? super T> action) { iter.tryAdvance(item -> {}); // discard return iter.tryAdvance(action); // use } }; return StreamSupport.stream(res,false); }
I can use it in the following ways:
Stream<DomainObject> res = Files.lines(src) filterEven(res) .map(line -> toDomainObject(line))
However, measuring the performance of this method on the next method using filter () with side effects, I noticed that the next method performed better:
final int[] counter = {0}; final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0; Stream<DomainObject> res = Files.lines(src) .filter(line -> isEvenLine ()) .map(line -> toDomainObject(line))
I tested the performance with jmh, and I didn't include the file load in the benchmark I loaded it into an array before Then, each benchmark first creates a stream < string > from the previous array, then filters the even rows, then applies maptoint() to extract the value of the int field, and finally extracts the max() operation This is one of its Benchmarks (you can check the whole program here. Here is data file with about 186 lines):
@Benchmark public int maxTempFilterEven(DataSource src){ Stream<String> content = Arrays.stream(src.data) .filter(s-> s.charAt(0) != '#') // Filter comments .skip(1); // Skip line: Not available return filterEven(content) // Filter daily info and skip hourly .mapToInt(line -> parseInt(line.substring(14,16))) .max() .getAsInt(); }
I don't understand why the filter () method has better performance (~ 80ops / MS) than filtereven () (~ 50ops / MS)?
Solution
introduce
I think I know why, but unfortunately I don't know how to improve the performance of splitter based solutions (at least not rewriting the entire streams API function)
Sidenote 1: when designing stream API, performance is not the most important design goal If performance is critical, it is likely that rewriting the code without the stream API will make the code faster (for example, the stream API inevitably increases memory allocation, which increases GC pressure) On the other hand, in most cases, the stream API provides better and more advanced APIs at the expense of relatively small performance degradation
Part 1 or short theoretical answers
Flow is designed to implement an internal iteration, because the main means of consumption and external iteration (i.e. based on splitter) is an additional means of "simulation" Therefore, external iterations involve some overhead Laziness increases some limitations of external iteration efficiency and needs to support flatmap, so some kind of dynamic buffer needs to be used in this process
Sidenote 2 in some cases, splitter based iterations may be as fast as internal iterations (i.e. filters in this case) Especially when you create a splitter directly from a stream containing data To see it, you can modify the test to materialize the first filter into a strings array:
String[] filteredData = Arrays.stream(src.data) .filter(s-> s.charAt(0) != '#') // Filter comments .skip(1) .toArray(String[]::new);
Then compare the performance of maxtempfilter and maxtempfiltereven to accept the pre filtered string [] filtereddata If you want to know why, you should probably read the rest of the long answer or at least part 2
Theoretical answers to part 2 or longer:
Streams are designed to be consumed primarily through some terminal operations Although iterative elements are supported, they are not designed to consume flow
Please note that using "function" flow APIs, such as map, flatmap, filter, reduce and collect, you can't say "I have enough data in a certain step, stop iterating the source and push the value" You can discard some incoming data (like a filter) but you can't stop the iteration (actually, the splitter is used to obtain and skip the conversion; anymatch, allmatch, nonematch, findfirst, findany, etc. use the non-public API j.u.s.sink.cancellationrequested, which is also easier because there can be no multiple terminal operations.) If all the transitions in the pipeline are synchronous, you can combine them into a single aggregate function (Consumer) and call it in a simple loop (optional execution of the loop execution is divided into several threads). This is what a simplified version of my state based filter represents (see the code in the "show some code" section) If there is a flatmap in the pipeline, but the idea is still the same, it will become a little complicated
The splitter - based transformation is fundamentally different because it adds asynchronous consumer - driven steps to the pipeline Now the splitter rather than the source stream drives the iteration process If you directly request the splitter on the source stream, it may return some implementations that only iterate over its internal data structure, which is why implementing pre filtered data should eliminate performance differences However, if you create a splitter for some non empty pipes, there is no other (simple) choice (see also the second example) than requiring the source to push elements one by one through the pipe until an element passes all the filters) The fact that source elements are pushed one by one rather than in some batches is the result of the basic decision to make streams lazy The need for a buffer instead of an element is the result of supporting flatmap: pushing an element from the source can generate many elements for the splitter
Part 3 or show me some code
This section attempts to provide some support for the code described in the "theory" section (link between actual code and simulated code)
First, you should know that the current streams API implementation accumulates non terminal (intermediate) operations into an inert pipeline (see j.u.s.abstractpipeline and its child nodes, such as j.u.s.referencepipeline. Then, when terminal operations are applied, all element streams in the original elements are "pushed" through the pipeline
What you see is the result of two things:
>The fact that the flow pipeline is different in the case you encounter is that there is a splitter based step in it. > Your oddlines is not the first step in the pipeline
The code with a status filter is more or less similar to the following simple code:
static int similarToFilter(String[] data) { final int[] counter = {0}; final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0; int skip = 1; boolean reduceEmpty = true; int reduceState = 0; for (String outerEl : data) { if (outerEl.charAt(0) != '#') { if (skip > 0) skip--; else { if (isEvenLine.test(outerEl)) { int intEl = parseInt(outerEl.substring(14,16)); if (reduceEmpty) { reduceState = intEl; reduceEmpty = false; } else { reduceState = Math.max(reduceState,intEl); } } } } } return reduceState; }
Note that this is actually a single loop with some calculations (filtering / conversion) inside
On the other hand, when you add the splitter to the pipeline, things will change significantly, even if the simplified code is similar to the code that actually happens, it will become larger, for example:
interface Sp<T> { public boolean tryAdvance(Consumer<? super T> action); } static class ArraySp<T> implements Sp<T> { private final T[] array; private int pos; public ArraySp(T[] array) { this.array = array; } @Override public boolean tryAdvance(Consumer<? super T> action) { if (pos < array.length) { action.accept(array[pos]); pos++; return true; } else { return false; } } } static class WrappingSp<T> implements Sp<T>,Consumer<T> { private final Sp<T> sourceSp; private final Predicate<T> filter; private final ArrayList<T> buffer = new ArrayList<T>(); private int pos; public WrappingSp(Sp<T> sourceSp,Predicate<T> filter) { this.sourceSp = sourceSp; this.filter = filter; } @Override public void accept(T t) { buffer.add(t); } @Override public boolean tryAdvance(Consumer<? super T> action) { while (true) { if (pos >= buffer.size()) { pos = 0; buffer.clear(); sourceSp.tryAdvance(this); } // Failed to fill buffer if (buffer.size() == 0) return false; T nextElem = buffer.get(pos); pos++; if (filter.test(nextElem)) { action.accept(nextElem); return true; } } } } static class OddLineSp<T> implements Sp<T>,Consumer<T> { private Sp<T> sourceSp; public OddLineSp(Sp<T> sourceSp) { this.sourceSp = sourceSp; } @Override public boolean tryAdvance(Consumer<? super T> action) { if (sourceSp == null) return false; sourceSp.tryAdvance(this); if (!sourceSp.tryAdvance(action)) { sourceSp = null; } return true; } @Override public void accept(T t) { } } static class ReduceIntMax { boolean reduceEmpty = true; int reduceState = 0; public int getReduceState() { return reduceState; } public void accept(int t) { if (reduceEmpty) { reduceEmpty = false; reduceState = t; } else { reduceState = Math.max(reduceState,t); } } } static int similarToSpliterator(String[] data) { ArraySp<String> src = new ArraySp<>(data); int[] skip = new int[1]; skip[0] = 1; WrappingSp<String> firstFilter = new WrappingSp<String>(src,(s) -> { if (s.charAt(0) == '#') return false; if (skip[0] != 0) { skip[0]--; return false; } return true; }); OddLineSp<String> oddLines = new OddLineSp<>(firstFilter); final ReduceIntMax reduceIntMax = new ReduceIntMax(); while (oddLines.tryAdvance(s -> { int intValue = parseInt(s.substring(14,16)); reduceIntMax.accept(intValue); })) ; // do nothing in the loop body return reduceIntMax.getReduceState(); }
This code is larger because logic is impossible (or at least very difficult) without some nontrivial stateful callbacks inside the loop Here, the interface SP is a mixture of j.u.s.stream and j.u.splitter interfaces
>The class arraysp represents arrays Stream results. > Class wrappingsp is similar to j.u.s.streamsplitters Wrappingsplitter, which in the actual code represents the implementation of the splitter interface for any non empty pipeline, that is, a stream with at least one intermediate operation applied to it (see j.u.s.abstractpipeline.splitter method) In my code, I merge it with the statelessop subclass, and the logic is responsible for the filter method implementation Also, for simplicity, I implemented the use of filter skipping. > Oddlinesp corresponds to your oddlines and its generated stream > reduceintmax, indicating Math for int Reduceops terminal operation of Max
So what is important in this example? The important thing here is that because you filter the original stream first, oddlinesp is created from a non empty pipe (wrappingsp) If you take a closer look at wrappingsp, you will notice that each time tryadvance is called, it delegates the call to sourcesp and accumulates the results into the buffer In addition, since there is no flatmap in the pipeline, the elements of the buffer will be copied one by one That is, every time wrappingsp When tryadvance, it calls arraysp Tryadvance, which returns only one element (through a callback) and passes it further to the consumer provided by the caller (unless the element does not match the filter, in which case arraysp. Tryadvance will be called again and again, but the buffer will never fill more than one element at a time)
Sidenote 3: if you want to see the real code, the most interesting place is j.u.s.streamsplitters WrappingSpliterator. Tryadvance, which will call j.u.s.streamsplitters AbstractWrappingSpliterator. Doadvance calls j.u.s.streamsplitters AbstractWrappingSpliterator. Fillbuffer, and j.u.s.streamsplitters AbstractWrappingSpliterator. Fillbuffer is also called in j.u.s.streamsplitters WrappingSpliterator. Initpartialtraversalstate initializes the pusher
Therefore, the main factor affecting performance is replication to the buffer Unfortunately, for us, the current implementation of the stream API is almost closed for normal Java developers. You can't just use inheritance or composition to modify some aspects of internal behavior You can use some reflection based hackers to copy to the buffer more effectively to adapt to your specific situation and obtain some performance (but at the expense of stream laziness), but you can't completely avoid this kind of replication, so the code based on splitter will be slow anyway
Returning to the example in sidenote # 2, the test and physicochemical filtering data based on splitter works faster because there is no wrappingsp in the pipeline before oddlinesp, so it will not be copied to the intermediate buffer