Весенняя Кафка и Сделки - PullRequest
       25

Весенняя Кафка и Сделки

0 голосов
/ 07 сентября 2018

Я бы хотел использовать Spring Kafka с транзакциями, но я не совсем понимаю, как он должен быть настроен и как он работает.

Вот моя конфигурация

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ACKS_CONFIG, "all");

Этот конфиг используется в DefaultKafkaProducerFactory с префиксом идентификатора транзакции:

defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");

Задача 1:

Как мне выбрать этот префикс идентификатора транзакции? Если я правильно понимаю, этот префикс используется Spring для генерации транзакционного идентификатора для каждого созданного производителя.

Почему мы не можем просто использовать "UUID.randomUUID ()?

Задача 2:

Если производитель уничтожен, он сгенерирует новый транзакционный идентификатор. Поэтому в случае сбоя приложения при повторном запуске оно будет использовать старый транзакционный идентификатор.

Это нормально ???

Задача 3:

Я использую приложение, развернутое в облаке, которое можно автоматически масштабировать вверх / вниз. Это означает, что мой префикс не может быть исправлен, так как все мои производители в каждом экземпляре будут иметь транзакционный идентификатор в конфликте.

Должен ли я добавить в него случайную часть? Нужно ли восстанавливать тот же префикс при уменьшении / увеличении экземпляра или сбое и перезапуске?

Задача 4:

И последнее, но не менее важное: мы используем учетные данные для нашей Kafka. Это не похоже на работу:

Current ACLs for resource `TransactionalId:my_app*`:
    User:CN... has Allow permission for operations: All from hosts: *

Как установить списки ACL, зная, что мои транзакционные идентификаторы созданы?

Редактировать 1

После прочтения, если я правильно понял.

Если у вас показание C0 (потребитель) из P0 (раздел). Если брокер начинает перебалансировку потребителя. P0 может быть назначен другому потребителю C1. Этот потребитель C1 должен использовать тот же идентификатор транзакции, что и предыдущий C0, чтобы предотвратить дублирование (фехтование зомби)?

Как вы этого добиваетесь в spring-kafka? Кажется, что идентификатор транзакции не имеет ничего общего с потребителем, поэтому раздел прочитан.

Спасибо

1 Ответ

0 голосов
/ 07 сентября 2018
  1. Вы не можете использовать случайный TID из-за зомби-фехтования - если произойдет сбой сервера, у вас может быть частичная транзакция в теме, которая никогда не будет завершена и больше ничего не будет потребляться из каких-либо разделов с записью для этой транзакции.

  2. То есть по замыслу - по вышеуказанной причине.

  3. Опять же, вы не можете рандомизировать; по вышеуказанной причине.

Cloud Foundry, например, имеет переменную среды, которая указывает индекс экземпляра. Если вы используете облачную платформу, которая не включает в себя что-то подобное, вам придется как-то ее смоделировать. Затем используйте его в идентификаторе транзакции:

spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
  1. ACL - я не могу на это ответить; Я не знаком с разрешениями Кафки; может быть, лучше задать для этого отдельный вопрос.

  2. Я думаю, нам нужно добавить логику в Spring, чтобы всегда использовать один и тот же идентификатор транзакции для определенной темы / раздела.

https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929

...