Подтверждение Кафка Производитель Apache Луч - PullRequest
0 голосов
/ 22 марта 2020

Как мне получить записи, в которых подтверждение было получено в apache beam KafkaIO?

По сути, я хочу, чтобы все записи, в которых я не получил никакого подтверждения go, были отправлены в таблицу больших запросов, так что что я могу повторить позже. Я использовал следующий фрагмент кода из документации

    .apply(KafkaIO.<Long, String>read()
       .withBootstrapServers("broker_1:9092,broker_2:9092")
       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
       .withKeyDeserializer(LongDeserializer.class)
       .withValueDeserializer(StringDeserializer.class)

       // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>

       // Rest of the settings are optional :

       // you can further customize KafkaConsumer used to read the records by adding more
       // settings for ConsumerConfig. e.g :
       .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))

       // set event times and watermark based on LogAppendTime. To provide a custom
       // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
       .withLogAppendTime()

       // restrict reader to committed messages on Kafka (see method documentation).
       .withReadCommitted()

       // offset consumed by the pipeline can be committed back.
       .commitOffsetsInFinalize()

       // finally, if you don't need Kafka metadata, you can drop it.g
       .withoutMetadata() // PCollection<KV<Long, String>>
    )
    .apply(Values.<String>create()) // PCollection<String>

1 Ответ

1 голос
/ 24 марта 2020

По умолчанию Beam IOs предназначены для продолжения попыток записи / чтения / обработки элементов до. (Пакетные конвейеры не будут работать после повторных ошибок)

То, на что вы ссылаетесь, обычно называется Очередь мертвых писем , чтобы взять ошибочные записи и добавить их в PCollection, Pubsub topi c, служба очередей и др. c. Это часто возможно по желанию, так как позволяет потоковому конвейеру совершать прогресс (не блокировать), когда встречаются ошибки при записи некоторых записей, но допускает записи, которые успешно записываются.

К сожалению, если я не ошибаюсь в IO Кафки нет очереди недоставленных сообщений. Может быть возможно изменить KafkaIO для поддержки этого. Были некоторые обсуждения списка рассылки луча с некоторыми идеями, предложенными для реализации этого, которые могли бы иметь некоторые идеи .

Я подозреваю, что можно было бы добавить это к KafkaWriter , перехватывает записи, которые не удалось, и выводит их в другую коллекцию PC. Если вы решите реализовать это, пожалуйста, также обратитесь к списку рассылки сообщества , если вы хотите помочь объединить его с master, они смогут убедиться, что изменение охватывает необходимые требования, чтобы оно могло быть объединены и имеют смысл в целом для луча.

Затем ваш конвейер может записать их в другом месте (то есть в другом источнике). Конечно, если этот вторичный источник одновременно имеет сбой / проблему, вам потребуется еще один DLQ.

...