Я хочу отправить сообщение Кафке и прочитать его без автоматической фиксации.
Я отправляю такие сообщения:
private Producer<Void, String> createProducer(String kafkaBootstrapServers) {
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
),
new VoidSerializer(),
new StringSerializer());
}
private void sendToKafka(int partition, String... messages) {
try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
Arrays.stream(messages).forEach(
message -> producer.send(new ProducerRecord<>(KAFKA_TOPIC, partition, null, message)));
}
}
Я прочитал это от Кафки с Apache Beam:
KafkaIO.<Void, String>read()
.withBootstrapServers(options.getKafkaBootstrapServers())
.withProcessingTime()
.withKeyDeserializer(VoidDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(new ImmutableMap.Builder<String, Object>()
.put(ENABLE_AUTO_COMMIT_CONFIG, false)
.put(AUTO_OFFSET_RESET_CONFIG, options.getKafkaConsumerOffsetReset())
.put(GROUP_ID_CONFIG, options.getKafkaConsumerGroup())
.build())
.withConsumerFactoryFn(new ConsumerFactoryFn())
.commitOffsetsInFinalize()
.withReadCommitted();
Но смещения не всегда фиксируются.
Я использую эту команду, чтобы увидеть смещения:
bin/kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server localhost:43881 --topic __consumer_offsets --from-beginning
У меня есть три раздела.
Во-первых, мне нужно отправить более 10 сообщений для раздела, чтобы увидеть смещение для этого раздела (не знаю, почему). Когда у меня был один раздел и я отправил 11 сообщений, все работало хорошо.
[group,test,0]::OffsetAndMetadata(offset=10, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1553866358077, expireTimestamp=Some(1554471158077))
Во-вторых, у меня иногда нет смещений для каждого раздела, даже при отправке 11 сообщений для каждого (после изменения от 1 до 3 разделов).