Какое значение будет передано для идентификатора транзакции в потоке транзакции api Alpakka - PullRequest
1 голос
/ 06 марта 2020

Используя Alpakka, я хочу использовать записи, используя Transactional.Source Api, и выдавать их в другой topi c, используя Transactional.flow, но в документах говорится, что нам нужно передать транзакцию.

Как мне создать TransactionId для например, следующий код

```via(Transactional.flow(producerSettings, transactionalId))```

Будет ли это для каждой записи производителя или каждого производителя в Alpakka

1 Ответ

0 голосов
/ 23 апреля 2020

Transactional.id играет важную роль в защите зомби. Но поддержание идентификатора, который является единым для всех сеансов производителя, а также правильно защищает зомби, немного сложно.

Ключ к правильному разделению зомби состоит в том, чтобы гарантировать, что входные темы и разделы в процессе чтения-процесса-записи цикл всегда одинаков для данного транзакции. Если это не так, то некоторые сообщения могут просочиться через ограждение, предоставляемое транзакциями.

Например, в приложении обработки распределенного потока предположим, что topi c -partition tp0 был первоначально обработан по транзакции.id T0. Если в какой-то момент позже он может быть сопоставлен другому производителю с транзакцией.id T1, между T0 и T1 не будет ограждения. Таким образом, возможно, что сообщения из tp0 будут повторно обработаны, что нарушает гарантированно единовременную обработку.

Практически, нужно либо сохранить отображение между входными разделами и транзакции.id во внешнем хранилище, либо иметь некоторые STATI c кодирование этого. Kafka Streams выбирает последний подход для решения этой проблемы. Как выполняются транзакции и как их настраивать

Транзакции в Apache Кафка

...