Как объединить два фрейма данных с API структурированной потоковой передачи Java Spark, не добавляя индекс и присоединяясь к ним? - PullRequest
0 голосов
/ 13 декабря 2018

Я хотел бы объединить разные столбцы из разных фреймов данных, избегая добавления индекса и присоединяясь к ним.

public static void count() throws StreamingQueryException {
    SparkSession session = SparkSession.builder().appName("streamFromKafka").master("local[*]").getOrCreate();

    Dataset<Row> df = session.readStream().format("kafka")
            .option("group.id","test-consumer-group")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "test").load();

    df.printSchema();

    StructField word = DataTypes.createStructField("word", DataTypes.StringType, true);
    StructField timestamp = DataTypes.createStructField("tstamp", DataTypes.LongType, true);
    StructType schema = DataTypes.createStructType(Arrays.asList(word, timestamp));

    Dataset<Row> df1 = df.selectExpr("CAST(key AS STRING) as KEY", "CAST(value AS STRING) AS value", "CAST(timestamp AS TIMESTAMP) AS timestamp");

    Dataset<Row> df2 = df1.select(functions.from_json(functions.col("value"), schema).as("WORD"), functions.col("timestamp"));

    df2.printSchema();

    Dataset<Row> df3 = df2.selectExpr("WORD.word AS w", "timestamp");

    df3.printSchema();

    //Let's say we have to do some sort of mapping on column "w", for example, the following:

    //Dataset<Boolean> ttt = df3.repartition(1, df3.col("w"), df3.col("timestamp")).map(
    //  line-> {
    //      int number = Integer.parseInt(line.getString(0));
    //      return (number % 2 != 0);
    //  }, Encoders.BOOLEAN());

    //How can I merge ttt with df3["timestamp"] into odds?

    //Dataset<Row> odds = ...

    Dataset<Row> df4 = odds.groupBy(functions.window(odds.col("timestamp"), "10 seconds", "5 seconds"), odds.col("w")).count();

    StreamingQuery query1 = df4.writeStream().format("console").option("truncate", false).outputMode("complete").trigger(Trigger.ProcessingTime(10000)).start();

    query1.awaitTermination();


}

Очевидно, я мог бы добавить индексировать и присоединить фреймы данных, как описано здесь или я мог бы зарегистрировать пользовательскую функцию, которая действует на столбец "w", конечно.Однако я не хотел бы использовать эти методы.Я хотел бы знать, как объединить два кадра данных в общей ситуации.

Я попытался использовать javaRDD или RDD, что кажется удобным.Однако при выполнении следующих вызовов:

dataframe.toJavaRDD();
dataframe.rdd();

я получаю следующее исключение:

Запросы с потоковыми источниками должны выполняться с помощью writeStream.start () ;;

...