У меня есть простые данные в формате json от kafka следующим образом:
{"id":"70f9-6dd3-62e0","status":true,"timestamp":1551172970162}
{"id":"70f9-6dd3-62f5","status":true,"timestamp":1551172970333}
{"id":"70f9-6dd3-62e0","status":false,"timestamp":1551172970786}
{"id":"70f9-6dd3-62f5","status":false,"timestamp":1551172971748}
Я хочу сгруппировать входные данные по id, состоянию и собрать временные метки в виде наборов.
здесьэто код:
Dataset<Row> data = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.option("startingOffsets", "earliest")
.load();
Dataset<Row> json = data.select(from_json(data.col("value").cast("string"), schema));
Dataset<Row> result = json.groupBy(json.col("id"), json.col("status"))
.agg(collect_set("timestamp").alias("timestamps"));
result.writeStream()
.outputMode("complete")
.format("console")
.start();
На консоли ничего не получается.
Я понятия не имею, в чем проблема.Пожалуйста, помогите.