Все данные поступают из топи Кафки c. Значение столбец преобразуется в строку и анализируется с требуемой схемой. В этой схеме есть столбцы с именем timestamp , и я хотел бы использовать этот столбец в watermark * 1006. * method.
Так как эта потоковая передача с отслеживанием состояния и это UPSERT в mon go, у меня будет дата первого и последнего события. Эти столбцы создаются методом agg .
Этот код ниже является просто debug . Netcat
использовался для создания соединения.
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val schema: StructType = new StructType().add("id", StringType, nullable = false).add("timestamp", StringType, nullable = false)
val read = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val typed = read.select(from_json(col("value").cast("string"), schema).as("value")).select(col("value.id"), col("value.timestamp").cast(TimestampType))
val stateful = typed.withWatermark("timestamp", "10 minutes").groupBy("id").agg(min(col("timestamp")).alias("first_event_datetime"), max(col("timestamp")).alias("last_event_datetime"))
val query = stateful.writeStream.outputMode("update").format("console").start()
query.awaitTermination()
Водяной знак определяется как 10 минут, но он не фильтрует данные из состояния . Есть ли ошибка в этой реализации?
Дополнительная информация.
Версия Spark: 2.3.2 .
Режим вывода: обновление .
Пн go Раковина: Upsert для каждой строки.
Состояние: размер увеличивается в среднем на 300 КБ каждые 5 минут .