Spark streaming - прослушивает первый входящий ряд, содержащий условие - PullRequest
0 голосов
/ 14 октября 2019

Я использую временную серию через Scala / Spark из темы Кафки.

val dfKafka = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicName") .option("startingOffsets","earliest") .load()
val carJsonDF= dfKafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]

val struct = {new StructType()
.add("car_id", StringType)
.add("time", StringType)
}

val carDF = carJsonDF.select(from_json($"value", struct).as("car")).selectExpr("car.car_id", "car.time")

Данные сортируются по времени с неопределенной периодичностью (т. Е. Между предыдущей и текущей строками нет фиксированного промежутка времени), а значения времени выглядят как 1,0, 1,4, 3,0, 3,9 и т. Д. .

Я бы хотел, чтобы для каждого car_id было прочитано первое число, которое имеет время> 1000;и первый прочитанный ряд, который имеет время> 2000;и первый прочитанный ряд, который имеет время> 3000;и т.д. и запишите их в таблицу кассандры.

Как я могу сделать это эффективным способом? По сути, я хочу просто использовать данные и выдать событие (записать кортеж в cassandra), как только будет прочитан ОДИН кортеж (для каждого car_id) с заданной характеристикой: нет необходимости ждать других кортежей, которые могут удовлетворять условию.

На данный момент я мог бы сказать только о решении, похожем на sql:

SELECT car_id, Min(time)
FROM TableName
where time >1000
group by car_id

, которое, по моему мнению, имеет несколько проблем: 1) его нужно будет повторить для времени = 1000, дляtime = 2000 и т. д. 2) spark должен был бы каким-то образом знать, что нет необходимости ждать других входящих кортежей, как только он может вывести один кортеж. 3) Как мне получить полный кортеж?

Спасибо за любой совет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...