Использовать KafkaIO в режиме пакетной обработки с использованием Google Dataflow - PullRequest
1 голос
/ 26 марта 2019

В задании Google Dataflow используется библиотека KafkaIO от Apache Beam с AvroIO и Windowed. Запись записи в файлы ".avro" в корзине Google Cloud Storage. Однако по умолчанию используется Streaming в качестве типа задания обработки для производственных данных.

Можно ли использовать данные из раздела Kafka, используя KafkaIO, в потоке данных, используя Пакетная обработка . Это задание потока данных не требует обработки в реальном времени (потоковой передачи). Есть ли способ также вставлять входящие записи в таблицу BigQuery без потоковой вставки затрат, позволяющих обработку типа пакета.

Пакетная обработка с менее частыми запусками может работать, приводя к меньшему объему памяти, виртуальных ЦП и вычислительным затратам.

Согласно: https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/KafkaIO.html

Источник KafkaIO возвращает неограниченную коллекцию записей Kafka в виде PCollection>.

Означает ли это, что Кафку, являющуюся неограниченным источником, нельзя запускать в пакетном режиме?

Проверка условия .withMaxNumRecords (1000000) запускает задание в пакетном режиме. Тем не менее, чтобы запустить задание в режиме реального времени, мне нужно удалить это условие.

Я попытался использовать управление окнами и установить флаг параметров режима потоковой передачи на false, но безуспешно, как показано в приведенном ниже коде.


// not streaming mode
options.setStreaming(false);

...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
               .withBootstrapServers("IPADDRESS:9092")
               .withTopic(topic)                                
               .withKeyDeserializer(StringDeserializer.class)
               .withValueDeserializer(StringDeserializer.class)                
               .updateConsumerProperties(props)
               .withConsumerFactoryFn(new ConsumerFactory())                            
//             .withMaxNumRecords(1000000)
               .withoutMetadata() 
       ).apply(Values.<String>create())
       .apply(Window.into(FixedWindows.of(Duration.standardDays(1))));


...
//convert to Avro GenericRecord

.apply("AvroToGCS", AvroIO.writeGenericRecords(AVRO_SCHEMA)
.withWindowedWrites()
.withNumShards(1)
.to("gs://BUCKET/FOLDER/")
.withSuffix(".avro"));

Код привел к Потоковому типу задания с 4 виртуальными ЦП и 1 рабочим в течение 9 минут, обрабатывающих 1,8 млн. Записей. После этого мне пришлось прекратить работу (утечка), чтобы предотвратить расходы.

Обеспечивая возможность пакетной обработки в потоке данных для входящих данных, можно ли собирать партии записей, записывая их в виде файлов avro, и продолжать это до тех пор, пока смещение не достигнет последнего .

Любые примеры или примеры кода приветствуются.

1 Ответ

1 голос
/ 30 марта 2019

Неограниченные источники не могут быть запущены в пакетном режиме.Это сделано специально, так как пакетные конвейеры ожидают, что будет прочитано конечное количество данных, и прекратят работу после их обработки.

Однако вы можете преобразовать неограниченные источники в ограниченный источник, ограничив количество читаемых записей., что вы сделали.Примечание: нет никакой гарантии, какие записи будут прочитаны.

Потоковые конвейеры должны быть всегда включены, чтобы они были доступны для чтения живых данных.Пакетные конвейеры предназначены для чтения резервных копий хранимых данных.

Пакетный конвейер не будет хорошо реагировать на чтение оперативных данных, он будет читать любые данные, которые есть при запуске конвейера, а затем завершаться.

...