Почему KafkaTransactionManager не применяется к этому производителю Spring Cloud Stream Kafka? - PullRequest
0 голосов
/ 27 июня 2019

Я настроил приложение Spring Cloud Stream Kafka для использования транзакций ( полный исходный код доступен на Github ):

spring:
  application:
    name: message-relay-service
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: message-relay-tx-
            producer:
              configuration:
                retries: 1
                acks: all
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

      bindings:
        output:
          destination: transfer
          contentType: application/*+avro
      schema-registry-client:
        endpoint: http://localhost:8081
      schema:
        avro:
          subjectNamingStrategy: org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
  datasource:
    url: jdbc:h2:tcp://localhost:9090/mem:mydb
    driver-class-name: org.h2.Driver
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: create
    database-platform: org.hibernate.dialect.H2Dialect

server:
  port: 8085

Это приложение имеет запланированное задание, которое периодически проверяет записи вотправить в базу данных, используя задачу @Scheduled.Эти методы помечены @Transactional, а основной класс определяет @EnableTransactionManagement.

Однако при отладке кода я понял, что KafkaTransactionManager не выполняется, то есть Кафки неттранзакции на месте.В чем проблема?

@EnableTransactionManagement
@EnableBinding(Source::class)
@EnableScheduling
@SpringBootApplication
class MessageRelayServiceApplication

fun main(args: Array<String>) {
    runApplication<MessageRelayServiceApplication>(*args)
}

---

@Component
class MessageRelay(private val outboxService: OutboxService,
                   private val source: Source) {

    @Transactional
    @Scheduled(fixedDelay = 10000)
    fun checkOutbox() {
        val pending = outboxService.getPending()
        pending.forEach {
            val message = MessageBuilder.withPayload(it.payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
                    .setHeader(MessageHeaders.CONTENT_TYPE, it.contentType)
                    .build()
            source.output().send(message)
            outboxService.markAsProcessed(it.id)
        }
    }

}

1 Ответ

0 голосов
/ 27 июня 2019

Я не вижу @EnableTransactionManagement в account-service, только в message-relay-service.

В любом случае, ваш сценарий в настоящее время не поддерживается;средство связывания транзакций было разработано для процессоров, в которых потребитель запускает транзакцию, все записи, отправленные в потоке потребителя, участвуют в этой транзакции, потребитель отправляет смещение в транзакцию и затем фиксирует транзакцию.

Он не был разработандля привязок только производителя;пожалуйста, откройте проблему GitHub для связывателя, потому что она должна поддерживаться.

Я не уверен, почему вы не видите запуск транзакции, но, даже если это так, проблема в том, что @Transactional будет использовать Boot auto-конфигурированный KTM (и фабрика производителей), а привязка использует другую фабрику производителей (ту, что в вашей конфигурации).

Даже если транзакция выполняется, производитель не будет в ней участвовать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...