Java Apache spark: long conversion chain leads to secondary time

I have a java program that uses Apache spark The most interesting parts of the program are as follows:

long seed = System.nanoTime();

JavaRDD<AnnotatedDocument> annotated = documents
    .mapPartitionsWithIndex(new InitialAnnotater(seed),true);
annotated.cache();

for (int iter = 0; iter < 2000; iter++) {
    GlobalCounts counts = annotated
        .mapPartitions(new GlobalCounter())
        .reduce((a,b) -> a.sum(b)); // update overall counts (*)

    seed = System.nanoTime();

    // copy overall counts which CountChanger uses to compute a stochastic thing (**)
    annotated = annotated
        .mapPartitionsWithIndex(new CountChanger(counts,seed),true); 
    annotated.cache();

    // adding these lines causes constant time complexity like i want
    //List<AnnotatedDocument> ll = annotated.collect();
    //annotated = sc.parallelize(ll,8); 
}

Therefore, in fact, the row (* *) generates an RDD with a form

documents
    .mapPartitionsWithIndex(initial)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    ... 2000 more

It's really a long map chain In addition, since the count needs to be updated, the row (*) is computationally enforced (non lazy) at each iteration

The problem I encounter is that the time complexity I get increases linearly with each iteration, so it is generally quadratic:

I think this is because spark tries to "remember" every RDD in the chain, as well as the fault-tolerant algorithm or any factor that leads to its growth But I really don't know

What I really want to do is tell spark to "crash" the RDD at each iteration so that only the last one is saved in memory and continues to work I think this should result in the same time for each iteration Is that possible? Are there any other solutions?

thank you!

Solution

Try using RDD checkpoint. This will save the RDD to HDFS and clear the lineage

Every time RDD is converted, the pedigree will be increased. Spark must track the available content and the content that must be recalculated Handling DAGs is expensive, and large DAGs tend to kill performance very quickly Checkpoint allows you to instruct spark to calculate and save the generated RDD and discard the information about how it was created This makes it similar to simply saving RDD and reading it back, thus minimizing DAG operations

In the sidenote, because you encounter this problem, you'd better know that the Union will also affect RDD performance by adding steps, and stackoverflowerror may also be thrown due to the way of inheriting information See this post

This link has more details and beautiful charts. The topic also mentioned in this so post

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>