Я использую AWS MSK и хочу включить ACL, но не могу создать тему, когда ACL включены. Я использую инструменты командной строки для всех операций. Вот краткое изложение того, что я делаю:
- Создайте новый кластер
- Создайте тему - это отлично работает
- Включите ACL для client1 на ресурсе =CLUSTER и операция = ALL
- Создать тему с помощью AdminClient (предоставив опцию --bootstrap-server) - это дает исключение тайм-аута
- Повторите попытку создания той же темы - это дает ошибкуговоря, тема уже существует
- Вывести список тем с помощью AdminClient - это не возвращает темы
- Создать тему с помощью Zookeeper connect - это работает
- Вывести список тем с помощью Zookeeper connect - это возвращает всю темуЯ создал (даже те, у которых истекло время ожидания)
Итак, проблема в том, что тема создается в Zookeeper, но брокер не может получить к ней доступ. Предположительно из-за какого-то правила ACL, которое мне не хватает.
Необработанный вывод команд, которые я выполнил:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
(kafka.admin.TopicCommand$)
Повторное выполнение той же команды:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
(kafka.admin.TopicCommand$)
Список тем через AdminClient:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list
Список тем через Zookeeper connect:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5
Вот мои правила ACL:
Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
Чего мне не хватает?