Как включить идемпотентность в кафке-консоли-производителе? - PullRequest
0 голосов
/ 24 декабря 2018

Пытаюсь включить опцию 'idempotent' на kafka-console -roduction.Ссылка на следующие ссылки:

Используемая команда:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true

Следующее исключениенаблюдается:

org.apache.kafka.common.KafkaException: Не удалось создать производителя кафки в org.apache.kafka.clients.producer.KafkaProducer. (KafkaProducer.java:433) в org.apache.kafka.clients.producer.KafkaProducer. (KafkaProducer.java:291) в kafka.producer.NewShinyProducer. (BaseProducer.scala: 40) в kafka.tools.ConsoleProducer $ .main (ConsoleProducer.scala: 50) в Кафке.tools.ConsoleProducer.main (ConsoleProducer.scala) Причина: org.apache.kafka.common.config.ConfigException: необходимо установить acks для всех, чтобы использовать идемпотентный производитель.В противном случае мы не можем гарантировать идемпотентность.в org.apache.kafka.clients.producer.KafkaProducer.configureAcks (KafkaProducer.java:510) в org.apache.kafka.clients.producer.KafkaProducer. (KafkaProducer.java:375)

Хотя acks уже имеет значение «all», мы наблюдаем это исключение.Что мне не хватает?

Ниже приведены используемые версии версий:

  • брокер - 1.0.0
  • клиент - производитель консоли в комплекте с брокером 1.0.0

Обновление

Я мог бы включить идемпотентность у производителя консоли, используя опцию --request-required-acks -1, как предложено в ответе.

Однако,Я получаю ClusterAuthorizationException.

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true  --request-required-acks -1  --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.

Это исключение возникает, только если включена опция идемпотентности.Можно создавать сообщения без этой опции.

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2

Чего мне не хватает?

1 Ответ

0 голосов
/ 25 декабря 2018

Вы не можете установить от acks до producer-property для ConsoleProducer.Вместо этого используйте request-required-acks, как показано ниже:

bin / kafka-console-producer.sh --broker-list localhost: 9092 --topic test --producer-property enable.idempotence = true--request-required-acks -1

...