Я использую реакторную библиотеку для извлечения большого потока данных из сети и отправки его брокеру kafka с использованием подхода реактивной кафки.
Ниже приведен производитель Kafka, который я использую
public class LogProducer {
private final KafkaSender<String, String> sender;
public LogProducer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
public void sendMessages(String topic, Flux<Logs.Data> records) {
AtomicInteger sentCount = new AtomicInteger(0);
AtomicInteger fCount = new AtomicInteger(0);
records.doOnNext(r -> fCount.incrementAndGet()).subscribe();
System.out.println("Total Records: " + fCount);
sender.send(records.doOnNext(r -> sentCount.incrementAndGet())
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
}
}
Общее количество записей в Flux (fCount) и записях, отправленных в Kafka topi c (sentCount), не совпадает, не выдает никакой ошибки и успешно завершается.
Для пример: в одном случае общее количество записей в Flux равно 2758, а количество отправленных на kafka счетчиков равно 256. Существует ли какая-либо конфигурация kafka, которую необходимо изменить, или я что-то пропустил?