Beam java SDK 2.10.0 с источником Kafka и обработчиком потока данных: оконный метод Count.perElement никогда не запускает данные - PullRequest
0 голосов
/ 26 февраля 2019

У меня проблема с запуском задания Beam SDK to 2.10.0 в Google DataFlow

Поток прост: я использую Kafka в качестве источника, затем применяю Фиксированные окна, затем подсчитываю элемент по ключу.Но похоже, что данные никогда не покидают стадию подсчета, пока работа не будет слита.Выходной набор Count.PerElement/Combine.perKey(Count)/Combine.GroupedValues.out0 всегда равен нулю.Элементы выдаются только после опустошения задания Dataflow.

Вот код:

public KafkaProcessingJob(BaseOptions options) {

    PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
                     .apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
                           .withBootstrapServers(options.getBootstrapServers())
                           .updateConsumerProperties(configureConsumerProperties())
                           .withCreateTime(Duration.standardMinutes(1L))
                           .withTopics(inputTopics)
                           .withReadCommitted()
                           .commitOffsetsInFinalize()
                           .withKeyDeserializer(StringDeserializer.class)
                           .withValueDeserializer(ByteArrayDeserializer.class))

                    .apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());

                    .apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
                                       .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                                       .discardingFiredPanes()
                                       .withAllowedLateness(Duration.standardMinutes(5)))

                    .apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
                            .apply(Count.<String>perElement())
                            .apply(
                                new WriteWindowedToBigQuery<>(
                                    project,
                                    dataset,
                                    table,
                                    configureWindowedTableWrite()));   
}

private Map<String, Object> configureConsumerProperties() {
    Map<String, Object> configUpdates = Maps.newHashMap();
    configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return configUpdates;
}

private static String getKey(GenericRecord record) {
    //extract key
}

Похоже, поток никогда не покидает стадию .apply(Count.<String>perElement())

Может кто-нибудь помочь?

1 Ответ

0 голосов
/ 01 марта 2019

Я нашел причину.

Это относится к TimestampPolicy, используемому здесь (.withCreateTime(Duration.standardMinutes(1L))).

Из-за присутствия пустых разделов в наших темах Kafka, водяной знак темы никогда не продвигался с использованием TimestampPolicy по умолчанию.Мне нужно было реализовать собственную политику для решения проблемы.

...