Java spark streaming JSON parsing
I have started to learn spark flow from spark engine, and new data analysis and spark I just want to create a small IOT application in which I want to predict future data
I have TIVA hardware, which sends real-time sensor JSON data as follows,
[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]
In this t is the UNIX timestamp of the published data The sensor is a sensor array, and the data of each sensor ('s') is'd '
What I want to do is to use these data and create spark flow objects, and then transfer all data through Spark's mlib (machine learning) or equivalent library to predict future data
I want to know if all technology options are feasible
>I decided to use? > How do I use nested JSON? I tried to use sqlcontext but failed. > General guidelines to achieve what I'm trying to do here
This is the code I use to consume messages from Kafka
SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc = new JavaStreamingContext(sc,new Duration(2000)); // TODO: processing pipeline Map<String,String> kafkaParams = new HashMap<String,String>(); kafkaParams.put("Metadata.broker.list","kafkaserver_address:9092"); Set<String> topics = Collections.singleton("RAH"); JavaPairInputDStream<String,String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class,StringDecoder.class,kafkaParams,topics); JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>,String>() { public String call(Tuple2<String,String> message) throws Exception { System.out.println(message._2()); return message._2(); }; }); System.out.println(" json is 0------ 0"+ json); json.foreachRDD(rdd -> { rdd.foreach( record -> System.out.println(record)); }); ssc.start(); ssc.awaitTermination();
PS: I want to do this in Java to maintain linearity and good performance
Solution
Since you are using spark 2.0 of sparksession, you can read JSON
json.foreachRDD( rdd -> { DataFrame df= spark.read.json(rdd) //process json with this DF. }
Alternatively, you can convert the RDD to the RDD of row, and then you can use the createdataframe method
json.foreachRDD( rdd -> { DataFrame df= spark.createDataFrame(rdd); //process json with this DF. }
DF can nest JSON processing, and you can operate according to this article
In addition, once you convert your JSON to DF, you can use it in any spark module (such as spark SQL, ML)