ChainedTransactionManager, вызывающий AfterCompletion в другом потоке - PullRequest
0 голосов
/ 07 января 2020

Я использую ChainedTransactionManager с KafkaTransactionManager и JpaTransactionManager. И я использую переменную Thread Local для хранения идентификатора.

И у меня есть такой поток:

HTTP call => HTTP Filter (устанавливает TL var) => logi c который отправляет событие Spring => TransactionalEventListener (phase = BeforeCommit) получает его и отправляет его kafka (использует перехватчик для добавления TL в качестве заголовка события и очищает TL)

Вот что должно быть происходит, но это то, что происходит.

... => logi c, который отправляет событие Spring => другое событие принимает поток, завершает, очищает TL => возвращается к слушателю события, получает событие, но на этом этапе TL очищается и публикация кафки завершается неудачей.

Все это происходит в одном потоке. Хотя Spring Events должны быть синхронными (за исключением того, что вы используете @Async, а я нет), TransactionalEventListener вызывается в другом потоке.

Это трассировка стека исключений:

    at org.apache.kafka.clients.producer.internals.ProducerInterceptors.onSend(ProducerInterceptors.java:61)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:855)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:569)
    at io.opentracing.contrib.kafka.TracingKafkaProducer.send(TracingKafkaProducer.java:116)
    at io.opentracing.contrib.kafka.TracingKafkaProducer.send(TracingKafkaProducer.java:97)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:404)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:222)
    at org.springframework.kafka.core.KafkaTemplate.sendDefault(KafkaTemplate.java:200)
    at revisions.messaging.publisher.implementation.RevisionsKafkaPublisher.sendDefault(RevisionsKafkaPublisher.kt:8)
    at revisions.messaging.publisher.implementation.RevisionsEventsPublisher.handle(RevisionsEventsPublisher.kt:17)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:300)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
    at org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter$TransactionSynchronizationEventAdapter.processEvent(ApplicationListenerMethodTransactionalAdapter.java:129)
    at org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter$TransactionSynchronizationEventAdapter.afterCompletion(ApplicationListenerMethodTransactionalAdapter.java:118)
    at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:171)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:990)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:965)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:786)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:712)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1418)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1398)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)

Есть ли способ избежать вызова AfterCompletion в другом потоке? Обработка событий синхронная, нет @Async или что-то подобное. Мне не нужны несколько потоков для обработки запроса / события.

Я думаю, что-то есть с синхронизацией транзакций и ChainedTransactionManager, но я не знаю, что.

Спасибо!

...