Я использую временную серию через Scala / Spark из темы Кафки.
val dfKafka = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicName") .option("startingOffsets","earliest") .load()
val carJsonDF= dfKafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
val struct = {new StructType()
.add("car_id", StringType)
.add("time", StringType)
}
val carDF = carJsonDF.select(from_json($"value", struct).as("car")).selectExpr("car.car_id", "car.time")
Данные сортируются по времени с неопределенной периодичностью (т. Е. Между предыдущей и текущей строками нет фиксированного промежутка времени), а значения времени выглядят как 1,0, 1,4, 3,0, 3,9 и т. Д. .
Я бы хотел, чтобы для каждого car_id было прочитано первое число, которое имеет время> 1000;и первый прочитанный ряд, который имеет время> 2000;и первый прочитанный ряд, который имеет время> 3000;и т.д. и запишите их в таблицу кассандры.
Как я могу сделать это эффективным способом? По сути, я хочу просто использовать данные и выдать событие (записать кортеж в cassandra), как только будет прочитан ОДИН кортеж (для каждого car_id) с заданной характеристикой: нет необходимости ждать других кортежей, которые могут удовлетворять условию.
На данный момент я мог бы сказать только о решении, похожем на sql:
SELECT car_id, Min(time)
FROM TableName
where time >1000
group by car_id
, которое, по моему мнению, имеет несколько проблем: 1) его нужно будет повторить для времени = 1000, дляtime = 2000 и т. д. 2) spark должен был бы каким-то образом знать, что нет необходимости ждать других входящих кортежей, как только он может вывести один кортеж. 3) Как мне получить полный кортеж?
Спасибо за любой совет.