Spark Структурированная потоковая передача и выпуск водяных знаков - PullRequest
0 голосов
/ 25 марта 2020

Все данные поступают из топи Кафки c. Значение столбец преобразуется в строку и анализируется с требуемой схемой. В этой схеме есть столбцы с именем timestamp , и я хотел бы использовать этот столбец в watermark * 1006. * method.

Так как эта потоковая передача с отслеживанием состояния и это UPSERT в mon go, у меня будет дата первого и последнего события. Эти столбцы создаются методом agg .

Этот код ниже является просто debug . Netcat использовался для создания соединения.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schema: StructType = new StructType().add("id", StringType, nullable = false).add("timestamp", StringType, nullable = false)

val read = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

val typed = read.select(from_json(col("value").cast("string"), schema).as("value")).select(col("value.id"), col("value.timestamp").cast(TimestampType))

val stateful = typed.withWatermark("timestamp", "10 minutes").groupBy("id").agg(min(col("timestamp")).alias("first_event_datetime"), max(col("timestamp")).alias("last_event_datetime"))

val query = stateful.writeStream.outputMode("update").format("console").start()

query.awaitTermination()

Водяной знак определяется как 10 минут, но он не фильтрует данные из состояния . Есть ли ошибка в этой реализации?

Дополнительная информация.

Версия Spark: 2.3.2 .

Режим вывода: обновление .

Пн go Раковина: Upsert для каждой строки.

Состояние: размер увеличивается в среднем на 300 КБ каждые 5 минут .

...