Я пытаюсь сделать моего производителя кафки транзакционным. Я отправляю 10 сообщений. Если возникает какая-либо ошибка, никакое сообщение не следует отправлять на kafka, т. Е. На все или на все.
Я использую Spring Boot KafkaTemplate.
@Configuration
@EnableKafka
public class KakfaConfiguration {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
// props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
// appProps.getJksLocation());
// props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
// appProps.getJksPassword());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, acks);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
config.put(ProducerConfig.RETRIES_CONFIG, retries);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean(name = "ktm")
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
}
Я отправляю 10 сообщений, например ниже, как указано в документе. Должно быть отправлено 9 сообщений, и мое сообщение имеет размер более 1 МБ, который отклоняется брокером Kafka из-за RecordTooLargeException
https://docs.spring.io/spring-kafka/reference/html/#using -kafkatransactionmanager
@Component
@EnableTransactionManagement
class Sender {
@Autowired
private KafkaTemplate<String, String> template;
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Transactional("ktm")
public void sendThem(List<String> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(toSend.size());
ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOG.info(" message sucess : " + result.getProducerRecord().value());
latch.countDown();
}
@Override
public void onFailure(Throwable ex) {
LOG.error("Message Failed ");
latch.countDown();
}
};
toSend.forEach(str -> {
ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
future.addCallback(callback);
});
if (latch.await(12, TimeUnit.MINUTES)) {
LOG.info("All sent ok");
} else {
for (int i = 0; i < toSend.size(); i++) {
if (!futures.get(i).isDone()) {
LOG.error("No send result for " + toSend.get(i));
}
}
}
Но когда я вижу топи c t_hello_world 9 сообщений там. Я ожидал увидеть 0 сообщений, так как мой производитель транзакционен. Как мне этого добиться?
Я получаю следующие журналы
2020-04-30 18:04:36.036 ERROR 18688 --- [ scheduling-1] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:923) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:297) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1013) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:296) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:713) ~[kafka-clients-2.4.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
2020-04-30 18:04:36.037 WARN 18688 --- [ scheduling-1] o.s.k.core.DefaultKafkaProducerFactory : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
2020-04-30 18:04:36.038 INFO 18688 --- [ scheduling-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-prod-990, transactionalId=prod-990] Closing the Kafka producer with timeoutMillis = 5000 **ms.
2020-04-30 18:04:36.038 INFO 18688 --- [oducer-prod-990] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-prod-990, transactionalId=prod-990] Aborting incomplete transaction due to shutdown**