Смещения Кафки не всегда совершаются - PullRequest
0 голосов
/ 29 марта 2019

Я хочу отправить сообщение Кафке и прочитать его без автоматической фиксации.

Я отправляю такие сообщения:

private Producer<Void, String> createProducer(String kafkaBootstrapServers) {
   return new KafkaProducer<>(
       ImmutableMap.of(
           ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers,
           ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
       ),
       new VoidSerializer(),
       new StringSerializer());
}


private void sendToKafka(int partition, String... messages) {
    try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
        Arrays.stream(messages).forEach(
            message -> producer.send(new ProducerRecord<>(KAFKA_TOPIC, partition, null, message)));
    }
}

Я прочитал это от Кафки с Apache Beam:

    KafkaIO.<Void, String>read()
            .withBootstrapServers(options.getKafkaBootstrapServers())
            .withProcessingTime()
            .withKeyDeserializer(VoidDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .updateConsumerProperties(new ImmutableMap.Builder<String, Object>()
                .put(ENABLE_AUTO_COMMIT_CONFIG, false)
                .put(AUTO_OFFSET_RESET_CONFIG, options.getKafkaConsumerOffsetReset())
                .put(GROUP_ID_CONFIG, options.getKafkaConsumerGroup())
                .build())
            .withConsumerFactoryFn(new ConsumerFactoryFn())
            .commitOffsetsInFinalize()
            .withReadCommitted();

Но смещения не всегда фиксируются.

Я использую эту команду, чтобы увидеть смещения:

bin/kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server localhost:43881 --topic __consumer_offsets --from-beginning

У меня есть три раздела.

Во-первых, мне нужно отправить более 10 сообщений для раздела, чтобы увидеть смещение для этого раздела (не знаю, почему). Когда у меня был один раздел и я отправил 11 сообщений, все работало хорошо.

[group,test,0]::OffsetAndMetadata(offset=10, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1553866358077, expireTimestamp=Some(1554471158077))

Во-вторых, у меня иногда нет смещений для каждого раздела, даже при отправке 11 сообщений для каждого (после изменения от 1 до 3 разделов).

...