В задании Google Dataflow используется библиотека KafkaIO от Apache Beam с AvroIO и Windowed. Запись записи в файлы ".avro" в корзине Google Cloud Storage. Однако по умолчанию используется Streaming в качестве типа задания обработки для производственных данных.
Можно ли использовать данные из раздела Kafka, используя KafkaIO, в потоке данных, используя Пакетная обработка . Это задание потока данных не требует обработки в реальном времени (потоковой передачи). Есть ли способ также вставлять входящие записи в таблицу BigQuery без потоковой вставки затрат, позволяющих обработку типа пакета.
Пакетная обработка с менее частыми запусками может работать, приводя к меньшему объему памяти, виртуальных ЦП и вычислительным затратам.
Согласно: https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
Источник KafkaIO возвращает неограниченную коллекцию записей Kafka в виде PCollection>.
Означает ли это, что Кафку, являющуюся неограниченным источником, нельзя запускать в пакетном режиме?
Проверка условия .withMaxNumRecords (1000000) запускает задание в пакетном режиме. Тем не менее, чтобы запустить задание в режиме реального времени, мне нужно удалить это условие.
Я попытался использовать управление окнами и установить флаг параметров режима потоковой передачи на false, но безуспешно, как показано в приведенном ниже коде.
// not streaming mode
options.setStreaming(false);
...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("IPADDRESS:9092")
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withConsumerFactoryFn(new ConsumerFactory())
// .withMaxNumRecords(1000000)
.withoutMetadata()
).apply(Values.<String>create())
.apply(Window.into(FixedWindows.of(Duration.standardDays(1))));
...
//convert to Avro GenericRecord
.apply("AvroToGCS", AvroIO.writeGenericRecords(AVRO_SCHEMA)
.withWindowedWrites()
.withNumShards(1)
.to("gs://BUCKET/FOLDER/")
.withSuffix(".avro"));
Код привел к Потоковому типу задания с 4 виртуальными ЦП и 1 рабочим в течение 9 минут, обрабатывающих 1,8 млн. Записей. После этого мне пришлось прекратить работу (утечка), чтобы предотвратить расходы.
Обеспечивая возможность пакетной обработки в потоке данных для входящих данных, можно ли собирать партии записей, записывая их в виде файлов avro, и продолжать это до тех пор, пока смещение не достигнет последнего .
Любые примеры или примеры кода приветствуются.