ReplyingKafkaTemplate с KafkaTransactionManager вызывает исключение - PullRequest
0 голосов
/ 18 июня 2020

Я разрабатываю микросервис, который должен использовать синхронный вызов в рамках транзакции с Spring Boot и Kafka. Для этого я использую ReplyKafkaTemplate с KafkaTransactionManager. Однако ReplyKafkaTemplate явно не работает с транзакциями! Сообщение Kafka необходимо обрабатывать транзакционно, поэтому в случае сбоя оно не будет потеряно. Однако, когда я добавляю настройки KafkaTransactionManager, выдается исключение.

Требования:

Spring Boot: 2.3.1

Kafka: 2.5.0

Spring Kafka: 2.5.2

Сценарий 1: При добавлении bean-компонента KafkaTransactionManager выдается ошибка во время запуска приложения: Ошибка при создании bean-компонента с именем 'kafkaTransactionManager', определенным в пути к классу ресурс.

Конфигурация: application.yml

kafka:
  bootstrap-servers: localhost:9092
  topic:
    request-topic: request-topic
    request-reply-topic: request-reply-topic
    consumer-group: request-reply-group
  producer:
    transactionIdPrefix: "kafka-tx-"


KafkaSettings. java

@Configuration
public class KafkaSettings {
    @Autowired
    private KafkaProperties kafkaProperties;

    @Value("${kafka.topic.request-reply-topic}")
    private String requestReplyTopic;

    @Value("${kafka.topic.consumer-group}")
    private String consumerGroup;

    @Bean
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "coffeeshop-consumer-group-id");
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory<String, Object> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
        return pf;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        final JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");

        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ReplyingKafkaTemplate<?, ?, ?> replyingKafkaTemplate(ProducerFactory<String, Object> pf, KafkaMessageListenerContainer<String, Object> container){
        return new ReplyingKafkaTemplate(pf, container);
    }

    @Bean
    public KafkaMessageListenerContainer<String, Object> replyContainer(ConsumerFactory<String, Object> cf) {
        ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(kafkaTemplate());
        return factory;
    }
}

CoffeeController. java

@RequestMapping
@RestController
public class CoffeeController {

    @Autowired(required = true)
    private ReplyingKafkaTemplate<String, Coffee, Coffee> replyingKafkaTemplate;

    @Value("${kafka.topic.request-topic}")
    String requestTopic;

    @Value("${kafka.topic.request-reply-topic}")
    String requestReplyTopic;

    private RequestReplyFuture<String, Coffee, Coffee> requestReplyFuture;

    @ResponseBody
    @PostMapping(value=“/coffee/create",produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public Coffee create (@RequestBody Coffee coffee) throws ExecutionException, InterruptedException {
        ProducerRecord<String, Coffee> record = new ProducerRecord<String, Coffee>(requestTopic, coffee);
        record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));

        requestReplyFuture = replyingKafkaTemplate.sendAndReceive(record);

        ConsumerRecord<String, Coffee> consumerRecord = requestReplyFuture.get();
        return consumerRecord.value();
    }
}

CreateCoffee. java

@Component
public class CreateCoffee {
     @KafkaListener(topics = "${kafka.topic.request-topic}")
     @SendTo
      public Coffee listen(Coffee coffee) {
            coffee.setId(UUID.randomUUID());
        return coffee;
      }
}


Coffee. java

@JsonInclude(JsonInclude.Include.NON_NULL)
public class Coffee {
    @JsonProperty("id")
    private UUID id;

    @JsonProperty("price")
    private BigDecimal price;

    @JsonProperty("varietal")
    private String varietal;

    @JsonProperty("quantity")
    private Integer quantity;

    @JsonProperty("amount")
    private BigDecimal amount;


    public UUID getId() {
        return id;
    }

    public void setId(UUID id) {
        this.id = id;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public String getVarietal() {
        return varietal;
    }

    public void setVarietal(String varietal) {
        this.varietal = varietal;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public void setQuantity(Integer quantity) {
        this.quantity = quantity;
    }

    public BigDecimal getAmount() {
        return price.multiply(BigDecimal.valueOf(quantity));
    }
}

Исключение:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaTransactionManager' defined in class path resource [coffeeshop/kafka/KafkaSettings.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.transaction.KafkaTransactionManager]: Factory method 'kafkaTransactionManager' threw exception; nested exception is java.lang.IllegalArgumentException: The 'ProducerFactory' must support transactions
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:655) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:483) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:226) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:893) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at coffeeshop.CoffeeShopApplication.main(CoffeeShopApplication.java:9) ~[classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.transaction.KafkaTransactionManager]: Factory method 'kafkaTransactionManager' threw exception; nested exception is java.lang.IllegalArgumentException: The 'ProducerFactory' must support transactions
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:650) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    ... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: The 'ProducerFactory' must support transactions
    at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.kafka.transaction.KafkaTransactionManager.<init>(KafkaTransactionManager.java:88) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at coffeeshop.kafka.KafkaSettings.kafkaTransactionManager(KafkaSettings.java:49) ~[classes/:na]
    at coffeeshop.kafka.KafkaSettings$$EnhancerBySpringCGLIB$$aa27e75a.CGLIB$kafkaTransactionManager$2(<generated>) ~[classes/:na]
    at coffeeshop.kafka.KafkaSettings$$EnhancerBySpringCGLIB$$aa27e75a$$FastClassBySpringCGLIB$$498ae027.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at coffeeshop.kafka.KafkaSettings$$EnhancerBySpringCGLIB$$aa27e75a.kafkaTransactionManager(<generated>) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    ... 21 common frames omitted

Сценарий 2: Ручная настройка ProducerFactory путем вызова transactionCapable () и добавления setTransactionIdPrefix ("coffee-prefix") , возникает следующая ошибка:

Транзакция не выполняется; возможные решения: запустить операцию шаблона в рамках шаблона .executeInTransaction ()

@Bean
public ProducerFactory<String, Object> producerFactory() {
    DefaultKafkaProducerFactory<String, Object> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
    pf.transactionCapable();
    pf.setTransactionIdPrefix("coffee-prefix");
    return pf;
}

java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
    at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:636) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:548) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:385) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.sendAndReceive(ReplyingKafkaTemplate.java:323) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.sendAndReceive(ReplyingKafkaTemplate.java:302) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at coffeeshop.web.controllers.CoffeeController.create(CoffeeController.java:39) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:879) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.36.jar:9.0.36]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Ожидаемые результаты:

Запрос:

curl -X POST  http://localhost:8080/coffee/create -d '{"varietal": "Yellow Bourbon","quantity": 3, "price": 30.00}' -H "Content-Type: application/json" | python -m json.tool

Ответ:

{
   "id": "26eca82d-8bd0-4fd4-a743-5de52f9b6d4f",
   "price": 30.0,
   "varietal": "Yellow Bourbon",
   "quantity": 3,
   "amount": 90.0
}

1 Ответ

0 голосов
/ 18 июня 2020

В первом случае вам понадобится префикс spring:

  spring:
    kafka:
      bootstrap-servers: localhost:9092
      topic:
        request-topic: request-topic
        request-reply-topic: request-reply-topic
        consumer-group: request-reply-group
      producer:
        transactionIdPrefix: "kafka-tx-"

В любом случае вам нужно добавить @Transactional к методу create, чтобы диспетчер транзакций запустил транзакция.

Или вы можете использовать


    @ResponseBody
    @PostMapping(value=“/coffee/create",produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public Coffee create (@RequestBody Coffee coffee) throws ExecutionException, InterruptedException {
        ProducerRecord<String, Coffee> record = new ProducerRecord<String, Coffee>(requestTopic, coffee);
        record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));

        return replyingKafkaTemplate.executeInTransaction(template -> {
            requestReplyFuture = replyingKafkaTemplate.sendAndReceive(record);

            ConsumerRecord<String, Coffee> consumerRecord = requestReplyFuture.get();
            return consumerRecord.value();
        });
    }

Ошибка кажется вполне понятной:

java .lang.IllegalStateException: транзакция не выполняется ; возможные решения: запустить операцию шаблона в рамках операции template.executeInTransaction (), запустить транзакцию с помощью @Transactional перед вызовом метода шаблона, запустить транзакцию, запущенную контейнером-слушателем при использовании записи

Кстати; лучше всего использовать get(timeout, TimeUnit...) в будущем, чтобы избежать зависания на неопределенный срок, если по какой-то причине не возвращается ответ.

EDIT

Однако это никогда не будет работать с потребитель с isolation.level=read_committed, потому что потребитель не увидит запрос, пока он не будет зафиксирован, и мы не зафиксируем его до тех пор, пока не будет получен ответ, чего никогда не произойдет, поэтому мы прервем транзакцию по тайм-ауту.

sendAndReceive() просто не может использоваться в транзакции.

Единственное решение - отправить и добавить своего собственного слушателя, чтобы получить ответ (и сопоставить его с запросом).

...