КАФКА: расщепление и повторная попытка (осталось 1 попытка). Ошибка: MESSAGE_TOO_LARGE - PullRequest
0 голосов
/ 01 мая 2020

Я отправляю 10 сообщений. 2 сообщения «правильные», а 1 сообщение имеет размер более 1 МБ, который отклоняется брокером Kafka из-за RecordTooLargeException.

У меня 2 сомнения 1) MESSAGE_TOO_LARGE появляется только тогда, когда планировщик вызывает метод второй раз и далее. Когда метод вызывается впервые при разделении и повторении планировщика (осталось 1 попытка). Ошибка: MESSAGE_TOO_LARGE не появляется. 2) Почему количество попыток не уменьшается. Я дал повтор = 1.

Я вызываю класс Sender с помощью механизма планирования загрузки Spring. Что-то вроде этого

@Scheduled(fixedDelay = 30000)
    public void process() {

        sender.sendThem();

    }

Я использую Spring Boot KafkaTemplate.

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}
@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

Я получаю следующие журналы

2020-05-01 15:55:18.346  INFO 6476 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1588328718345
2020-05-01 15:55:18.347  INFO 6476 --- [   scheduling-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to -1 with epoch -1
2020-05-01 15:55:18.351  INFO 6476 --- [oducer-prod-991] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-prod-991, transactionalId=prod-991] Cluster ID: bL-uhcXlRSWGaOaSeDpIog
2020-05-01 15:55:48.358  INFO 6476 --- [oducer-prod-991] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to 13000 with epoch 10
 Value of kafka template----- 1518752790
2020-05-01 15:55:48.377  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 8 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.379  INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  :  message sucess : TTTT0
2020-05-01 15:55:48.379  INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  :  message sucess : TTTT1
2020-05-01 15:55:48.511 ERROR 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  : Message Failed 
2020-05-01 15:55:48.512 ERROR 6476 --- [oducer-prod-991] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='

2020-05-01 15:55:48.514  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 10 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.518  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 11 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.523  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 12 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.527  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 13 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.531  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 14 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.534  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 15 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.538  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 16 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.542  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 17 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.546  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 18 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE

Затем через некоторое время программа завершает работу с следующим журналом

Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for t_101-0:120000 ms has passed since batch creation

2020-05-01 16:18:31.322  WARN 17816 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7085a4dd, txId=prod-991]
2020-05-01 16:18:31.322  INFO 17816 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-991, transactionalId=prod-991] Closing the Kafka producer with timeoutMillis = 5000 ms.
2020-05-01 16:18:31.324  INFO 17816 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Aborting incomplete transaction due to shutdown
 error messahe here
------ processing done in parent class------

1 Ответ

1 голос
/ 01 мая 2020

Ниже приводится общая картина рабочего процесса производителя.

enter image description here

Установив свойство RETRIES_CONFIG , мы можем гарантировать, что в в случае неудачи этот производитель попытается отправить это сообщение.

Если пакет слишком большой , мы разделяем пакет и снова отправляем разделенные пакеты. В этом случае мы не уменьшаем количество попыток повторения.

Вы можете go с помощью приведенного ниже исходного кода и найти сценарий ios, в котором число повторов уменьшено.

https://github.com/apache/kafka/blob/68ac551966e2be5b13adb2f703a01211e6f7a34b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

...