Конвейер луча не производит никакого вывода после GroupByKey с обработкой окон, и я получил ошибку памяти - PullRequest
3 голосов
/ 12 апреля 2019

Цель:

Я хочу загрузить потоковые данные, затем добавить ключ и затем подсчитать их по ключу.

Проблема:

Конвейер Apache Beam Dataflow получает ошибку памяти, когда я пытаюсь загрузить и сгруппировать по ключу данные большого размера, используя потоковый подход (неограниченные данные) , Потому что кажется, что данные накапливаются по группам, и они не запускают данные раньше при запуске каждого окна.

Если я уменьшу размер элементов (количество элементов не изменится), это сработает! потому что на самом деле шаг за шагом ожидает группировки всех данных, а затем запускает все новые оконные данные.

Я тестировал оба:

версия луча 2.11.0 и версия scio 0.7.4

версия луча 2.6.0 и версия scio 0.6.1

Способ восстановления ошибки:

  1. Прочитать сообщение Pubsub, содержащее имя файла
  2. Считывание и загрузка связанного файла из GCS в виде построчного итератора
  3. Сглаживать построчно (так что генерируется около 10 000) элементов
  4. Добавить метки времени (текущее мгновенное время) к элементам
  5. Создать значение ключа из моих данных (с некоторыми случайными целочисленными ключами от 1 до 10)
  6. Применить окно с триггером (оно будет срабатывать примерно 50 раз в случае, если строки малы и нет проблем с памятью)
  7. Количество на ключ (сгруппируйте по ключу, затем объедините их)
  8. Наконец, у нас должно быть около 50 * 10 элементов, представляющих счетчики по окну и ключу (успешно протестировано, когда размер строк достаточно мал)

Визуализация конвейера (шаги с 4 по 7):

enter image description here

Сводка для шага по группам:

enter image description here

Как видите, данные накапливаются поэтапно и не отправляются.

Код окна здесь:

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)

Ошибка:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

Есть ли какое-либо решение для решения проблемы с памятью, возможно, путем принудительного группирования для выдачи ранних результатов каждого окна.

1 Ответ

3 голосов
/ 16 апреля 2019

KeyCommitTooLargeException - это не проблема с памятью, а проблема с сериализацией protobuf. Protobuf имеет ограничение в 2 ГБ для объекта ( максимальный размер google protobuf ). Поток данных обнаружил, что значение одного ключа в конвейере было больше 2 ГБ, поэтому он не мог перетасовать данные. Сообщение об ошибке указывает, что «это может быть вызвано группированием очень большого объема данных в одном окне без использования объединения или созданием большого объема данных из одного элемента ввода». Исходя из настроек вашего конвейера (т. Е. Назначенных случайных ключей), скорее всего, последний.

Возможно, конвейер прочитал большой файл (> 2 ГБ) из GCS и назначил его случайному ключу. GroupByKey требует операции тасования клавиш, а поток данных не удалось выполнить из-за ограничения protobuf, поэтому он застрял на этом ключе и удерживает водяной знак.

Если один ключ имеет большое значение, вы можете уменьшить размер значения, например, сжать строку или разбить строку на несколько ключей или сгенерировать файл GCS меньшего размера.

Если большое значение связано с группировкой нескольких ключей, возможно, вы захотите увеличить пространство клавиш, чтобы каждая группа по ключевым операциям заканчивала тем, что группировала меньше ключей вместе.

...