Я отправляю 10 сообщений. 2 сообщения «правильные», а 1 сообщение имеет размер более 1 МБ, который отклоняется брокером Kafka из-за RecordTooLargeException
.
У меня 2 сомнения 1) MESSAGE_TOO_LARGE появляется только тогда, когда планировщик вызывает метод второй раз и далее. Когда метод вызывается впервые при разделении и повторении планировщика (осталось 1 попытка). Ошибка: MESSAGE_TOO_LARGE не появляется. 2) Почему количество попыток не уменьшается. Я дал повтор = 1.
Я вызываю класс Sender с помощью механизма планирования загрузки Spring. Что-то вроде этого
@Scheduled(fixedDelay = 30000)
public void process() {
sender.sendThem();
}
Я использую Spring Boot KafkaTemplate.
@Configuration
@EnableKafka
public class KakfaConfiguration {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
// props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
// appProps.getJksLocation());
// props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
// appProps.getJksPassword());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, acks);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
config.put(ProducerConfig.RETRIES_CONFIG, retries);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean(name = "ktm")
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
}
@Component
@EnableTransactionManagement
class Sender {
@Autowired
private KafkaTemplate<String, String> template;
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Transactional("ktm")
public void sendThem(List<String> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(toSend.size());
ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOG.info(" message sucess : " + result.getProducerRecord().value());
latch.countDown();
}
@Override
public void onFailure(Throwable ex) {
LOG.error("Message Failed ");
latch.countDown();
}
};
toSend.forEach(str -> {
ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
future.addCallback(callback);
});
if (latch.await(12, TimeUnit.MINUTES)) {
LOG.info("All sent ok");
} else {
for (int i = 0; i < toSend.size(); i++) {
if (!futures.get(i).isDone()) {
LOG.error("No send result for " + toSend.get(i));
}
}
}
Я получаю следующие журналы
2020-05-01 15:55:18.346 INFO 6476 --- [ scheduling-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1588328718345
2020-05-01 15:55:18.347 INFO 6476 --- [ scheduling-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to -1 with epoch -1
2020-05-01 15:55:18.351 INFO 6476 --- [oducer-prod-991] org.apache.kafka.clients.Metadata : [Producer clientId=producer-prod-991, transactionalId=prod-991] Cluster ID: bL-uhcXlRSWGaOaSeDpIog
2020-05-01 15:55:48.358 INFO 6476 --- [oducer-prod-991] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to 13000 with epoch 10
Value of kafka template----- 1518752790
2020-05-01 15:55:48.377 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 8 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.379 INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender : message sucess : TTTT0
2020-05-01 15:55:48.379 INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender : message sucess : TTTT1
2020-05-01 15:55:48.511 ERROR 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender : Message Failed
2020-05-01 15:55:48.512 ERROR 6476 --- [oducer-prod-991] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='
2020-05-01 15:55:48.514 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 10 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.518 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 11 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.523 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 12 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.527 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 13 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.531 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 14 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.534 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 15 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.538 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 16 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.542 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 17 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.546 WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 18 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
Затем через некоторое время программа завершает работу с следующим журналом
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for t_101-0:120000 ms has passed since batch creation
2020-05-01 16:18:31.322 WARN 17816 --- [ scheduling-1] o.s.k.core.DefaultKafkaProducerFactory : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7085a4dd, txId=prod-991]
2020-05-01 16:18:31.322 INFO 17816 --- [ scheduling-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-prod-991, transactionalId=prod-991] Closing the Kafka producer with timeoutMillis = 5000 ms.
2020-05-01 16:18:31.324 INFO 17816 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-991, transactionalId=prod-991] Aborting incomplete transaction due to shutdown
error messahe here
------ processing done in parent class------