Пытаюсь включить опцию 'idempotent' на kafka-console -roduction.Ссылка на следующие ссылки:
Используемая команда:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true
Следующее исключениенаблюдается:
org.apache.kafka.common.KafkaException: Не удалось создать производителя кафки в org.apache.kafka.clients.producer.KafkaProducer. (KafkaProducer.java:433) в org.apache.kafka.clients.producer.KafkaProducer. (KafkaProducer.java:291) в kafka.producer.NewShinyProducer. (BaseProducer.scala: 40) в kafka.tools.ConsoleProducer $ .main (ConsoleProducer.scala: 50) в Кафке.tools.ConsoleProducer.main (ConsoleProducer.scala) Причина: org.apache.kafka.common.config.ConfigException: необходимо установить acks для всех, чтобы использовать идемпотентный производитель.В противном случае мы не можем гарантировать идемпотентность.в org.apache.kafka.clients.producer.KafkaProducer.configureAcks (KafkaProducer.java:510) в org.apache.kafka.clients.producer.KafkaProducer. (KafkaProducer.java:375)
Хотя acks уже имеет значение «all», мы наблюдаем это исключение.Что мне не хватает?
Ниже приведены используемые версии версий:
- брокер - 1.0.0
- клиент - производитель консоли в комплекте с брокером 1.0.0
Обновление
Я мог бы включить идемпотентность у производителя консоли, используя опцию --request-required-acks -1
, как предложено в ответе.
Однако,Я получаю ClusterAuthorizationException.
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true --request-required-acks -1 --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
Это исключение возникает, только если включена опция идемпотентности.Можно создавать сообщения без этой опции.
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2
Чего мне не хватает?