Java spark streaming JSON parsing

I have started to learn spark flow from spark engine, and new data analysis and spark I just want to create a small IOT application in which I want to predict future data

I have TIVA hardware, which sends real-time sensor JSON data as follows,

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]

In this t is the UNIX timestamp of the published data The sensor is a sensor array, and the data of each sensor ('s') is'd '

What I want to do is to use these data and create spark flow objects, and then transfer all data through Spark's mlib (machine learning) or equivalent library to predict future data

I want to know if all technology options are feasible

>I decided to use? > How do I use nested JSON? I tried to use sqlcontext but failed. > General guidelines to achieve what I'm trying to do here

This is the code I use to consume messages from Kafka

SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaStreamingContext ssc = new JavaStreamingContext(sc,new Duration(2000));

    // TODO: processing pipeline
    Map<String,String> kafkaParams = new HashMap<String,String>();
    kafkaParams.put("Metadata.broker.list","kafkaserver_address:9092");
    Set<String> topics = Collections.singleton("RAH");


    JavaPairInputDStream<String,String> directKafkaStream = 
            KafkaUtils.createDirectStream(ssc,String.class,StringDecoder.class,kafkaParams,topics);


    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>,String>() {
        public String call(Tuple2<String,String> message) throws Exception {
            System.out.println(message._2());
            return message._2();
        };
    });


    System.out.println(" json is  0------ 0"+ json);



    json.foreachRDD(rdd -> {
        rdd.foreach(
                record -> System.out.println(record));
    });

    ssc.start();
    ssc.awaitTermination();

PS: I want to do this in Java to maintain linearity and good performance

Solution

Since you are using spark 2.0 of sparksession, you can read JSON

json.foreachRDD( rdd -> {

      DataFrame df= spark.read.json(rdd)
      //process json with this DF.
}

Alternatively, you can convert the RDD to the RDD of row, and then you can use the createdataframe method

json.foreachRDD( rdd -> {

          DataFrame df= spark.createDataFrame(rdd);
          //process json with this DF.
    }

DF can nest JSON processing, and you can operate according to this article

In addition, once you convert your JSON to DF, you can use it in any spark module (such as spark SQL, ML)

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>