Режим добавления вывода не поддерживается, если потоковые агрегаты в потоковых DataFrames / DataSets без водяных знаков - PullRequest
0 голосов
/ 09 апреля 2020

Я пытаюсь решить мою проблему. У меня есть поток kafka, который я загружаю в Spark. Сообщения от Kafka topi c имеют следующие атрибуты: bl_iban, blacklisted, timestamp. Таким образом, есть IBANS, флаг о том, находится ли IBAN в черном списке (Y / N), а также есть отметка времени этой записи. Дело в том, что для одного IBAN может быть несколько записей, потому что сверхурочные IBAN могут попасть в черный список или «удалить». И что я пытаюсь достичь, так это то, что я хочу знать текущий статус каждого из IBANS. Однако я начал с еще более простой цели, которая заключается в том, чтобы перечислить для каждой последней версии IBAN timestamp (и после этого я хотел бы также добавить статус blacklisted). Поэтому я создал следующий код (где черный список представляет набор данных, который я загружен из Kafka):

blackList = blackList.groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));

И после этого я попытался распечатать это на консоли, используя следующий код:

StreamingQuery query = blackList.writeStream()
    .format("console")
    .outputMode(OutputMode.Append())
    .start();

Я запустил свой код и получаю следующую ошибку: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

Поэтому я добавил водяной знак в свой набор данных следующим образом:

blackList = blackList.withWatermark("timestamp", "2 seconds")
                .groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));

И после этого получил ту же ошибку. Любые идеи, как я могу решить эту проблему?


Обновление: с помощью mike мне удалось избавиться от этой ошибки. Но проблема в том, что я все еще не могу заставить свой черный список работать. Я могу видеть, как данные загружаются из Kafka, но после этого из моей групповой операции я получаю две пустые партии, и это все. Печатные данные от Кафки:

+-----------------------+-----------+-----------------------+
|bl_iban                |blacklisted|timestamp              |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N          |2020-04-10 17:26:58.208|
|SK341492788657560898224|N          |2020-04-10 17:26:58.214|
|SK118866580129485701645|N          |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+

Вот как я получил тот черный список, который выводится:

blackList = blackList.selectExpr("split(cast(value as string),',') as value", "cast(timestamp as timestamp) timestamp")
                .selectExpr("value[0] as bl_iban", "value[1] as blacklisted", "timestamp");

И это моя групповая операция:

Dataset<Row> blackListCurrent = blackList.withWatermark("timestamp", "20 minutes")
                .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
                .agg(col("bl_iban"), max("timestamp"));

Ссылка на исходный файл: Spark Blacklist

1 Ответ

1 голос
/ 09 апреля 2020

Когда вы используете водяные знаки в Spark, вы должны убедиться, что ваша агрегация знает об окне. Документация Spark предоставляет дополнительную информацию.

В вашем случае код должен выглядеть примерно так:

blackList = blackList.withWatermark("timestamp", "2 seconds")
  .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
  .agg(col("bl_iban"), max("timestamp"));

Важно, чтобы атрибут timestamp имел метка времени типа данных!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...