Я хотел бы объединить разные столбцы из разных фреймов данных, избегая добавления индекса и присоединяясь к ним.
public static void count() throws StreamingQueryException {
SparkSession session = SparkSession.builder().appName("streamFromKafka").master("local[*]").getOrCreate();
Dataset<Row> df = session.readStream().format("kafka")
.option("group.id","test-consumer-group")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test").load();
df.printSchema();
StructField word = DataTypes.createStructField("word", DataTypes.StringType, true);
StructField timestamp = DataTypes.createStructField("tstamp", DataTypes.LongType, true);
StructType schema = DataTypes.createStructType(Arrays.asList(word, timestamp));
Dataset<Row> df1 = df.selectExpr("CAST(key AS STRING) as KEY", "CAST(value AS STRING) AS value", "CAST(timestamp AS TIMESTAMP) AS timestamp");
Dataset<Row> df2 = df1.select(functions.from_json(functions.col("value"), schema).as("WORD"), functions.col("timestamp"));
df2.printSchema();
Dataset<Row> df3 = df2.selectExpr("WORD.word AS w", "timestamp");
df3.printSchema();
//Let's say we have to do some sort of mapping on column "w", for example, the following:
//Dataset<Boolean> ttt = df3.repartition(1, df3.col("w"), df3.col("timestamp")).map(
// line-> {
// int number = Integer.parseInt(line.getString(0));
// return (number % 2 != 0);
// }, Encoders.BOOLEAN());
//How can I merge ttt with df3["timestamp"] into odds?
//Dataset<Row> odds = ...
Dataset<Row> df4 = odds.groupBy(functions.window(odds.col("timestamp"), "10 seconds", "5 seconds"), odds.col("w")).count();
StreamingQuery query1 = df4.writeStream().format("console").option("truncate", false).outputMode("complete").trigger(Trigger.ProcessingTime(10000)).start();
query1.awaitTermination();
}
Очевидно, я мог бы добавить индексировать и присоединить фреймы данных, как описано здесь или я мог бы зарегистрировать пользовательскую функцию, которая действует на столбец "w", конечно.Однако я не хотел бы использовать эти методы.Я хотел бы знать, как объединить два кадра данных в общей ситуации.
Я попытался использовать javaRDD или RDD, что кажется удобным.Однако при выполнении следующих вызовов:
dataframe.toJavaRDD();
dataframe.rdd();
я получаю следующее исключение:
Запросы с потоковыми источниками должны выполняться с помощью writeStream.start () ;;