Я пытался реализовать chainedTM (для транзакции с БД, за которой следовала транзакция Кафки). Я следовал до c здесь - https://docs.spring.io/spring-kafka/reference/html/#ex -jdb c -syn c.
После реализации я создал тест, в котором я намеренно провалю транзакцию kafka, чтобы убедиться, что транзакция БД откатывается, но это НЕ происходит так. Нужна помощь.
Вот мой код конфигурации
@Configuration
@EnableTransactionManagement
public class KafkaConfig {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConfig.class);
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props =
new HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 0);
return props;
}
@Bean
public Map<String, Object> producervdtConfigs() {
Map<String, Object> props =
new HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 0);
return props;
}
@Bean
public ProducerFactory<String, let> producerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<String, let>(producerConfigs());
factory.setTransactionIdPrefix(UUID.randomUUID().toString());
return factory;
}
@Bean(name = "chainedlbrTM")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager transactionManager,
KafkaTransactionManager<String, let> kafkaTransactionManager) {
return new ChainedTransactionManager(transactionManager, kafkaTransactionManager);
}
@Bean(name = "chainedvdtTM")
public ChainedTransactionManager chainedvdtTransactionManager(JpaTransactionManager transactionManager,
KafkaTransactionManager<String, vet> kafkavdtTransactionManager) {
return new ChainedTransactionManager(transactionManager, kafkavdtTransactionManager);
}
@Bean
public KafkaTemplate<String, let> kafkaTemplate() {
return new KafkaTemplate<String, let>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, let> kafkaTransactionManager() {
KafkaTransactionManager<String, let> ktm = new KafkaTransactionManager<String, let>(producerFactory());
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
@Bean
public KafkaTransactionManager<String, vet> kafkavdtTransactionManager() {
KafkaTransactionManager<String, vet> ktm = new KafkaTransactionManager<String, vet>(producervdtFactory());
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
return new JpaTransactionManager(em);
}
@Bean
public ProducerFactory<String, vet> producervdtFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<String, vet>(producervdtConfigs());
factory.setTransactionIdPrefix(UUID.randomUUID().toString());
return factory;
}
@Bean
public KafkaTemplate<String, vet> kafkavdtTemplate() {
return new KafkaTemplate<String, vet>(producervdtFactory());
}
Мой класс обслуживания
@Service("libraryService")
@EnableTransactionManagement
public class LibraryServiceImpl
implements ILibraryService
{
@Autowired
private LibraryRepository libraryRepository;
@Autowired
private IEventProducer producer;
@Override
@Transactional(transactionManager = "chainedLibraryTM")
public Library createLibrary (final Library library)
throws LibraryAlreadyExist
{
LOG.debug("Creating library {}:{}", library.getName(), library.getVersion());
final Library result = libraryRepository.createLibrary(library);
producer.libraryCreated(result);
return result;
}
мой EventProducer
@Autowired
private KafkaTemplate<String, LibraryEvent> kafkaTemplate;
public void libraryCreated(final Library library)
{
sendMessage(EventType.CREATED, library);
}
private void sendMessage(final EventType type, final Library library) {
LOG.info("Sending {} event for library {}, topic = {}", type, library.getId(), topic);
final LibraryEvent event = LibraryEvent.newBuilder()
.setLibraryId(library.getId())
.setLibraryName(library.getName())
.setLibraryVersion(library.getVersion())
.setEventType(type)
.build();
LOG.trace("Kafka Record: {}", event.toString());
ProducerRecord<String, LibraryEvent> record = new ProducerRecord<String, LibraryEvent>(topic, library.getId().toString(), event);
ListenableFuture<SendResult<String, LibraryEvent>> future = kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<String, LibraryEvent>>(){
@Override
public void onSuccess(SendResult<String, LibraryEvent> result) {
}
@Override
public void onFailure(Throwable ex) {
}
});
if (future.isDone()) {
try {
future.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted", e);
}
catch (ExecutionException e) {
throw new KafkaException("Send failed", e.getCause());
}
}
Мой тест
@Test
public void testArt() throws Exception {
library2 = this.libraryService.createLibrary(
new LibraryBuilder()
.name(tid)
.version("1.0")
.artifact(
new ArtifactBuilder()
.name("TestP")
.tag(P)
.content("first content")
.build())
.artifact(
new ArtifactBuilder()
.name("TestF")
.tag(F)
.content("first content")
.build())
.build());
После включения уровня ведения журнала заметил следующие журналы. Исключение обусловлено тем, что я установил конфиг (props.put (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 0);), это приводит к сбоям транзакции kafka, чтобы выполнить откат БД.
{"@timestamp":"2020-04-30T11:18:42.184+05:30","@version":"1","message":"Exception thrown when sending a message with key='1' and payload='{\"libraryId\": 1, \"libraryName\": \"ware\", \"libraryVersion\": \"1.0\", \"eventType\": \"CREATED\"}' to topic pr-rt-deploy:","logger_name":"org.springframework.kafka.support.LoggingProducerListener","thread_name":"Test worker","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.common.errors.RecordTooLargeException: The message is 139 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.\n","tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.194+05:30","@version":"1","message":"Unable to post deploy event for library 1, topic = pr-rt-deploy, due to = Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 139 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. .","logger_name":"events.EventProducerImpl","thread_name":"Test worker","level":"INFO","level_value":20000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.195+05:30","@version":"1","message":"Completing transaction for [service.LibraryServiceImpl.createLibrary]","logger_name":"org.springframework.transaction.interceptor.TransactionInterceptor","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.201+05:30","@version":"1","message":"Clearing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.202+05:30","@version":"1","message":"Removed value [org.springframework.orm.jpa.EntityManagerHolder@421d8c5] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@40617316] from thread [Test worker]","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.205+05:30","@version":"1","message":"Initializing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.205+05:30","@version":"1","message":"Triggering beforeCommit synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.206+05:30","@version":"1","message":"Triggering beforeCompletion synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.206+05:30","@version":"1","message":"Initiating transaction commit","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"DEBUG","level_value":10000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.213+05:30","@version":"1","message":"Triggering afterCommit synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.213+05:30","@version":"1","message":"Clearing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.214+05:30","@version":"1","message":"Triggering afterCompletion synchronization","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.214+05:30","@version":"1","message":"Removed value [org.springframework.kafka.core.KafkaResourceHolder@31ab2fab] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@1e34605e] from thread [Test worker]","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.216+05:30","@version":"1","message":"Resuming suspended transaction after completion of inner transaction","logger_name":"org.springframework.kafka.transaction.KafkaTransactionManager","thread_name":"Test worker","level":"DEBUG","level_value":10000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.216+05:30","@version":"1","message":"Initializing transaction synchronization","logger_name":"org.springframework.transaction.support.TransactionSynchronizationManager","thread_name":"Test worker","level":"TRACE","level_value":5000,"tenantId":"ware","opcRequestId":"1234"}
{"@timestamp":"2020-04-30T11:18:42.247+05:30","@version":"1","message":"DataSourceManager shutdown started ..","logger_name":".infra.datasource.DataSourceManager","thread_name":"Test worker","level":"INFO","level_value":20000}