Я пытаюсь решить мою проблему. У меня есть поток kafka, который я загружаю в Spark. Сообщения от Kafka topi c имеют следующие атрибуты: bl_iban
, blacklisted
, timestamp
. Таким образом, есть IBANS, флаг о том, находится ли IBAN в черном списке (Y / N), а также есть отметка времени этой записи. Дело в том, что для одного IBAN может быть несколько записей, потому что сверхурочные IBAN могут попасть в черный список или «удалить». И что я пытаюсь достичь, так это то, что я хочу знать текущий статус каждого из IBANS. Однако я начал с еще более простой цели, которая заключается в том, чтобы перечислить для каждой последней версии IBAN timestamp
(и после этого я хотел бы также добавить статус blacklisted
). Поэтому я создал следующий код (где черный список представляет набор данных, который я загружен из Kafka):
blackList = blackList.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
И после этого я попытался распечатать это на консоли, используя следующий код:
StreamingQuery query = blackList.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
Я запустил свой код и получаю следующую ошибку: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
Поэтому я добавил водяной знак в свой набор данных следующим образом:
blackList = blackList.withWatermark("timestamp", "2 seconds")
.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
И после этого получил ту же ошибку. Любые идеи, как я могу решить эту проблему?
Обновление: с помощью mike мне удалось избавиться от этой ошибки. Но проблема в том, что я все еще не могу заставить свой черный список работать. Я могу видеть, как данные загружаются из Kafka, но после этого из моей групповой операции я получаю две пустые партии, и это все. Печатные данные от Кафки:
+-----------------------+-----------+-----------------------+
|bl_iban |blacklisted|timestamp |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N |2020-04-10 17:26:58.208|
|SK341492788657560898224|N |2020-04-10 17:26:58.214|
|SK118866580129485701645|N |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+
Вот как я получил тот черный список, который выводится:
blackList = blackList.selectExpr("split(cast(value as string),',') as value", "cast(timestamp as timestamp) timestamp")
.selectExpr("value[0] as bl_iban", "value[1] as blacklisted", "timestamp");
И это моя групповая операция:
Dataset<Row> blackListCurrent = blackList.withWatermark("timestamp", "20 minutes")
.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
.agg(col("bl_iban"), max("timestamp"));
Ссылка на исходный файл: Spark Blacklist