Java – Apache spark lambda expression – serialization problem
I tried to use a lambda expression in the spark task and threw a "Java. Lang. illegalargumentexception: invalid lambda deserialization" exception This exception is thrown when the code is like "transform (prdd - > prdd. Map (T - > t. _2))" The code snippet is as follows
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y); JavaDStream<Integer> con = aggregate.transform( (Function<JavaPairRDD<String,Integer>,JavaRDD<Integer>>)pRDD-> pRDD.map( (Function<Tuple2<String,Integer>)t->t._2)); JavaPairDStream<String,JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map( (Function<Tuple2<String,Integer> & Serializable)t->t._2));
The above two options did not work It seems that I passed the object "F" as a parameter instead of the lambda expression "t - > t_. 2" Useful
Function f = new Function<Tuple2<String,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
I can see the correct format for representing this function as a lambda expression
public static void main(String[] args) {
Function f = new Function<Tuple2<String,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
JavaStreamingContext ssc = JavaStreamingFactory.getInstance();
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost",9999);
JavaDStream<String> words = lines.flatMap(s->{return Arrays.asList(s.split(" "));});
JavaPairDStream<String,Integer> pairRDD = words.mapToPair(x->new Tuple2<String,Integer>(x,1));
JavaPairDStream<String,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>)t->t._2));
//JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works
con.print();
ssc.start();
ssc.awaitTermination();
}
Solution
I don't know why lambda doesn't work Maybe the problem is that lambda is nested in lambda This seems to be recognized by spark documents
contrast http://spark.apache.org/docs/latest/programming-guide.html#basics Examples:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a,b) -> a + b);
with http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform -Operation as an example:
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String,Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String,Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String,JavaPairRDD<String,Integer>>() {
@Override public JavaPairRDD<String,Integer> call(JavaPairRDD<String,Integer> rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
}
});
The second example uses the function subclass instead of lambda, probably because you found the same problem
I don't know if this is useful for you, but nested Lambdas are definitely suitable for Scala Consider the scala version of the previous example:
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
