Получение периодически KafkaProducerException: не удалось отправить org. apache .kafka.common.errors.TimeoutException - PullRequest
0 голосов
/ 26 апреля 2020

Я получаю сообщение об ошибке ниже - вызвано: org. apache .kafka.common.errors.TimeoutException: Истекает 1 запись (ы), как показано ниже,

 Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
 record(s) for pipeline-demo-0: 60125 ms has passed since last append
 2020-04-26 16:11:14.927  ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and
 payload='KafkaMessage(message={grx_projectCode=Value(v=demo,
 dataType=STRING), grx_gid=Value(v=5e5207a8-881d-...' to topic
 pipeline-demo and partition 0:
     org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for pipeline-demo-0: 60125 ms has passed since last append
 2020-04-26 16:11:14.927 ERROR i.t.g.c.c.s.i.DumpToKafkaServiceImpl - Dump to kafka exception 
    org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is
 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
 for pipeline-demo-0: 60125 ms has passed since last append

Попробуйте несколько комбинаций больший тайм-аут и меньший размер пакета с задержкой 0 мс все еще получают эту ошибку.

Конфигурации потребителя:

event.topic=events
consumer.threads=1
max.poll.records=1000
max.poll.interval.ms=120000
max.partition.fetch.bytes=1048576
fetch.max.bytes=524288000
fetch.min.bytes=1
fetch.max.wait.ms=500

Конфигурации производителя:

retries=2
batch.size=100
linger.ms=0
buffer.memory=17179869184
acks=all

код для производителя

@Override
    public void send(String topic, KafkaMessage kafkaMessage, String partitionBy, String correlationId) {
        Integer partition = null;
        if (!StringUtils.isEmpty(partitionBy)) {
            try {
                int numPartitions = template.partitionsFor(topic).size();
                partition = Utils.abs(Utils.murmur2(partitionBy.getBytes())) % numPartitions;
            } catch (Exception e) {
                log.error("Unable to get partitions for topic", e);
            }
        }

        ProducerRecord<Integer, KafkaMessage> record = new ProducerRecord<Integer, KafkaMessage>(topic, partition, null,
                kafkaMessage, null);
        ListenableFuture<SendResult<Integer, KafkaMessage>> future = template.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<Integer,KafkaMessage>>() {

            @Override
            public void onSuccess(SendResult<Integer, KafkaMessage> result) {
                MeterFactory.getEventsSavedMeter().mark();

            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("Dump to kafka exception ", ex);
                MeterFactory.getEventsSaveFailedMeter().mark();
            }
        });
    }

код для конфигурации, KafkaProducerConfig. java,

public class KafkaProducerConfig {

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    @Value("${retries}")
    private String retries;

    @Value("${batch.size}")
    private String batchSize;

    @Value("${linger.ms}")
    private String lingerMilliSeconds;

    @Value("${buffer.memory}")
    private String bufferMemory;

    @Value("${acks}")
    private String acks;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMilliSeconds);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<Integer, KafkaMessage> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Integer, KafkaMessage> kafkaTemplate() {
        return new KafkaTemplate<Integer, KafkaMessage>(producerFactory());
    }

}

1 Ответ

0 голосов
/ 27 апреля 2020

Кафка не сразу отправляет записи. Он группирует их и периодически отправляет партии настроенного размера (batchSize & lingerMilliSconds).

Исходя из сообщений об истечении нескольких записей, вы отправляете слишком мало данных без очистки производителя.

...