Проблема при отправке потока данных на реактивную кафку - PullRequest
0 голосов
/ 27 февраля 2020

Я использую реакторную библиотеку для извлечения большого потока данных из сети и отправки его брокеру kafka с использованием подхода реактивной кафки.

Ниже приведен производитель Kafka, который я использую

public class LogProducer {

    private final KafkaSender<String, String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic, Flux<Logs.Data> records) {

        AtomicInteger sentCount = new AtomicInteger(0);
        AtomicInteger fCount = new AtomicInteger(0);

        records.doOnNext(r -> fCount.incrementAndGet()).subscribe();
        System.out.println("Total Records: " + fCount);

        sender.send(records.doOnNext(r -> sentCount.incrementAndGet())
                .map(record -> {
                    LogRecord lrec = record.getRecords().get(0);
                    String id = lrec.getId();
                    return SenderRecord.create(new ProducerRecord<>(topic, id,
                            lrec.toString()), id);
                })).then()
                .doOnError(e -> {
                    log.error("[FAIL]: Send to the topic: '{}' failed. "
                            + e, topic);
                })
                .doOnSuccess(s -> {
                    log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
                })
                .subscribe();
    }

}

Общее количество записей в Flux (fCount) и записях, отправленных в Kafka topi c (sentCount), не совпадает, не выдает никакой ошибки и успешно завершается.

Для пример: в одном случае общее количество записей в Flux равно 2758, а количество отправленных на kafka счетчиков равно 256. Существует ли какая-либо конфигурация kafka, которую необходимо изменить, или я что-то пропустил?

...