Spark Streaming Dataframe выполняет, сохраняя состояние, разделяет локальную группуBy, избегая перемешиваний - PullRequest
0 голосов
/ 13 марта 2020

Чувствую себя немного потерянным в этот момент.

У меня есть потоковое приложение на основе Spark 2.4.2 и Kafka, которое записывает агрегированный (с временным окном) поток обратно в Kafka:

  • [DF1] потоковый предварительно разделенный Dataframe (по ключу theKey), т. Е. Поток, который гарантирует, что K поступает в один и тот же раздел каждый раз.
  • [DF2] Таблица поиска ( ~ 1000 строк), к которым я присоединяюсь с DF1.
  • GroupBy на основе ключа и непрерывно перемещающегося окна в течение 1 дня.
    DF1.join(DF2, "df1.a" === "df2.b", "left")
       .withWatermark("timestamp", "24 hours")
       .groupBy(window('timestamp, "24 hours"), 'theKey)
       .agg(collect_list('payload) as "payload")

Проблема: перемешивание. С предварительным разбиением набора данных перед этим (в Kafka) я надеялся получить локальный раздел для groupBy. К сожалению, это не сработало.

Вопрос в том, как правильно достичь этого без тасов? Есть ли какие-нибудь?

Решения, которые я исследовал до сих пор:

  1. "agg over window ...": не поддерживается при потоковой передаче (Spark throws: Non-time-based windows are not supported on streaming DataFrames/Datasets)
  2. mapPartitions: Не уверен, как учитывать в State (mapWithState). mapGroupsWithState требуется KeyValueGroupedDataset[K, V], который предоставляется только GroupByKey.

Решения, которые я рассматриваю (неохотно):

  1. mapPartitions на фрейме данных с настраиваемым управлением состоянием. Тем не менее, это делает потоковую передачу Spark с отслеживанием состояния бесполезной.
  2. Каким-то образом подключите исходное разделение ha sh (из фрейма данных Kafka) к Spark, чтобы он позаботился о перемешиваниях навсегда (а не использовал значение по умолчанию *) 1037 *), но я еще не нашел точного источника.

Любая помощь очень ценится!

1 Ответ

0 голосов
/ 14 марта 2020

На самом деле таблицы поиска вызывали все перестановки. Я надеялся, что Spark предпочтет разбить больший набор данных на меньшие таблицы поиска, но это не так. Он взял набор потоковых данных, проигнорировал разделение и переставил их там, где были разделы таблицы поиска.

Как только я перераспределил таблицы поиска в соответствии с потоковым фреймом данных, Спарк был счастлив. Тем не менее, это противоречит интуитивному пониманию того, что Spark не отдает приоритет разделению больших наборов данных над более мелкими.

...