SendTransactionally KafkaSender начинает новую транзакцию во время предыдущей транзакции - PullRequest
0 голосов
/ 20 сентября 2019

Я пользуюсь Кафкой 2.3.0.У меня есть реактивный контроллер, который должен просто отправлять записи в тему.Я попытался настроить один раз доставку и добавить в мои свойства следующие значения:

  processing.guarantee: exactly_once
  isolation.level: read_committed
  enable.auto.commit: false
  linger.ms: 100
  producer.linger.ms: 100
  fetch.max.wait.ms: 100
  partition-count: 16
  replication.factor: 1
  max.in.flight.requests.per.connection: 1

Я устанавливаю транзакцию при создании отправителя.Есть метод, который отправляет часть запроса в тему:

protected Mono<SenderResult<String>> sendToTopic(@NonNull final String correlationId,
        @NonNull final AbstractCommandRequest request) {
    return Flux
        .just(SenderRecord.create(
                new ProducerRecord<>(internalCoreTopic,
                        new ProjectChannelKey(request.getProjectId(), request.getChannelId()), request),
                correlationId))
        .subscriberContext(ctx -> addMDC(request, ctx))
        .flatMap(records -> sender.sendTransactionally(Flux.just(Flux.just(records))))
        .concatMap(r -> r)
        .reduce((a, b) -> {
            if (b.exception() != null) {
                return b;
            }
            return a;
        });
} 

Он работает, но отправитель не ждет, пока предыдущая транзакция будет принята.И иногда я получаю это исключение:

 12:01:43.482 [1ingress-forwarders-tridproject-internal-core-2] ERROR c.l.k.m.c.i.rest.IngressController corr.id=009d0d8a-7e58-4f04-a93d-5540047db732/1568970103473:4 - TransactionalId 1ingress-forwarders-tridproject-internal-core: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    org.apache.kafka.common.KafkaException: TransactionalId 1ingress-forwarders-tridproject-internal-core: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:759)
        at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:752)
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:217)
        at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:643)
        at reactor.kafka.sender.internals.DefaultKafkaSender$DefaultTransactionManager.lambda$null$0(DefaultKafkaSender.java:485)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:53)
        at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)
        at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:61)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
        at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:353)
        at com.project.common.config.mdc.MdcContextLifter.onNext(MdcContextLifter.java:33)
        at reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:178)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.MonoLift] :
        reactor.core.publisher.Mono.create(Mono.java:183)
        reactor.kafka.sender.internals.DefaultKafkaSender$DefaultTransactionManager.lambda$begin$1(DefaultKafkaSender.java:484)
    Error has been observed by the following operator(s):
        |_  Mono.create ⇢ reactor.kafka.sender.internals.DefaultKafkaSender$DefaultTransactionManager.lambda$begin$1(DefaultKafkaSender.java:484)
        |_  Mono.flatMap ⇢ reactor.kafka.sender.internals.DefaultKafkaSender$DefaultTransactionManager.begin(DefaultKafkaSender.java:484)
        |_  Mono.then ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:211)
        |_  Mono.concatWith ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:212)
        |_  Flux.thenMany ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:213)
        |_  Flux.thenMany ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:213)
        |_  Flux.concatWith ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:214)
        |_  Flux.concatWith ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:216)
        |_  Flux.concatWith ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:217)
        |_  Flux.concatWith ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:218)
        |_  Mono.error ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.lambda$transaction$13(TopicDescriptor.java:232)
        |_  Flux.onErrorResume ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:226)
        |_  Flux.publishOn ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.transaction(TopicDescriptor.java:239)
        |_  Flux.concatMapDelayError ⇢ com.project.kafka.queues.TopicDescriptor$KafkaSenderWrapper.sendTransactionally(TopicDescriptor.java:154)
        |_  Flux.concatMap ⇢ com.project.kafka.queues.rest.AbstractKafkaRestController.sendToTopic(AbstractKafkaRestController.java:73)
        |_  Flux.reduce ⇢ com.project.kafka.queues.rest.AbstractKafkaRestController.sendToTopic(AbstractKafkaRestController.java:74)
        |_  Mono.map ⇢ com.project.kafka.ms.client.ingress.rest.IngressController.sendCommandRequestToInternalCore(IngressController.java:263)
        |_  Mono.then ⇢ com.project.kafka.ms.client.ingress.rest.IngressController.lambda$execCommand$7(IngressController.java:130)
        |_  Mono.flatMap ⇢ com.project.kafka.ms.client.ingress.rest.IngressController.execCommand(IngressController.java:103)
        |_  Mono.cast ⇢ com.project.kafka.ms.client.ingress.rest.IngressController.execCommand(IngressController.java:133)

Как заставить транзакцию ждать, пока предыдущая транзакция будет зафиксирована или прервана?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...