Цель:
Я хочу загрузить потоковые данные, затем добавить ключ и затем подсчитать их по ключу.
Проблема:
Конвейер Apache Beam Dataflow получает ошибку памяти, когда я пытаюсь загрузить и сгруппировать по ключу данные большого размера, используя потоковый подход (неограниченные данные)
, Потому что кажется, что данные накапливаются по группам, и они не запускают данные раньше при запуске каждого окна.
Если я уменьшу размер элементов (количество элементов не изменится), это сработает! потому что на самом деле шаг за шагом ожидает группировки всех данных, а затем запускает все новые оконные данные.
Я тестировал оба:
версия луча 2.11.0 и версия scio 0.7.4
версия луча 2.6.0 и версия scio 0.6.1
Способ восстановления ошибки:
- Прочитать сообщение Pubsub, содержащее имя файла
- Считывание и загрузка связанного файла из GCS в виде построчного итератора
- Сглаживать построчно (так что генерируется около 10 000) элементов
- Добавить метки времени (текущее мгновенное время) к элементам
- Создать значение ключа из моих данных (с некоторыми случайными целочисленными ключами от 1 до 10)
- Применить окно с триггером (оно будет срабатывать примерно 50 раз в случае, если строки малы и нет проблем с памятью)
- Количество на ключ (сгруппируйте по ключу, затем объедините их)
- Наконец, у нас должно быть около 50 * 10 элементов, представляющих счетчики по окну и ключу (успешно протестировано, когда размер строк достаточно мал)
Визуализация конвейера (шаги с 4 по 7):
![enter image description here](https://i.stack.imgur.com/3Ux8X.png)
Сводка для шага по группам:
![enter image description here](https://i.stack.imgur.com/4aSgA.png)
Как видите, данные накапливаются поэтапно и не отправляются.
Код окна здесь:
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
Ошибка:
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Есть ли какое-либо решение для решения проблемы с памятью, возможно, путем принудительного группирования для выдачи ранних результатов каждого окна.