Apache Beam KafkaIO BatchMode OOM выпуск - PullRequest
0 голосов
/ 11 декабря 2019

У меня есть случай, когда я хочу прочитать данные из Kafka в пакетном режиме, используя Apache Beam с Spark Runner.

Используя withMaxNumRecords(long) метод класса KafkaIO, можносделать BoundedReader из UnboundedReader. Но я обнаружил, что в пакетном режиме данные сначала считываются из каждого раздела, помещаются в память, а затем передаются следующему действию (карта, фильтр и т. Д.).

У меня огромные данные в каждом разделе, и при чтении этих данных в пакетном режиме я получаю ошибку OOM. Я пытался увеличить память исполнителя. Но для каждого запуска я не могу настроить этот параметр с требуемым значением. Другое дело, что я могу читать те же данные в потоковом режиме.

Я думаю, что это происходит, потому что в пакетном режиме все записи из каждого раздела назначаются на GlobalWindow (часть ProcessContext), которыйсрабатывает только все данные читаются. Это может произойти из-за проблемы OOM.

Если это причина, то как я могу изменить GlobalWindow на PartitioningWindow в ProcessContext?

Если это не причина, тоКак я могу прочитать эти огромные данные из Kafka в пакетном режиме, используя Apache Beam, не увеличивая память исполнителя для каждого запуска?

1 Ответ

0 голосов
/ 12 декабря 2019

Из документации

You can use windowing with fixed-size data sets in bounded PCollections. However, note that windowing considers only the implicit timestamps attached to each element of a PCollection, and data sources that create fixed data sets (such as TextIO) assign the same timestamp to every element. This means that all the elements are by default part of a single, global window.

To use windowing with fixed data sets, you can assign your own timestamps to each element. To assign timestamps to elements, use a ParDo transform with a DoFn that outputs each element with a new timestamp (for example, the WithTimestamps transform in the Beam SDK for Java).

Вот пример того, как определить окна для ограничивающего набора данных - https://beam.apache.org/get-started/wordcount-example/#unbounded-and-bounded-datasets

Также из документации считывается ограниченный набор данных одновременно. и назначил глобальное окно, как описано ниже

The bounded (or unbounded) nature of your PCollection affects how Beam processes your data. A bounded PCollection can be processed using a batch job, which might read the entire data set once, and perform processing in a job of finite length. An unbounded PCollection must be processed using a streaming job that runs continuously, as the entire collection can never be available for processing at any one time.

Я полагаю, что для решения вашей проблемы вы можете попробовать установить окно для ограниченной коллекции и запустить его в облачном потоке данных, чтобы увидеть, работает ли это или нет. Вы также можете обратиться к матрице возможностей луча, чтобы увидеть, что поддерживается с точки зрения Spark Runner.

...