AWS MSK - время ожидания при создании темы Kafka с включенным ACL - PullRequest
1 голос
/ 30 сентября 2019

Я использую AWS MSK и хочу включить ACL, но не могу создать тему, когда ACL включены. Я использую инструменты командной строки для всех операций. Вот краткое изложение того, что я делаю:

  • Создайте новый кластер
  • Создайте тему - это отлично работает
  • Включите ACL для client1 на ресурсе =CLUSTER и операция = ALL
  • Создать тему с помощью AdminClient (предоставив опцию --bootstrap-server) - это дает исключение тайм-аута
  • Повторите попытку создания той же темы - это дает ошибкуговоря, тема уже существует
  • Вывести список тем с помощью AdminClient - это не возвращает темы
  • Создать тему с помощью Zookeeper connect - это работает
  • Вывести список тем с помощью Zookeeper connect - это возвращает всю темуЯ создал (даже те, у которых истекло время ожидания)

Итак, проблема в том, что тема создается в Zookeeper, но брокер не может получить к ней доступ. Предположительно из-за какого-то правила ACL, которое мне не хватает.

Необработанный вывод команд, которые я выполнил:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
 (kafka.admin.TopicCommand$)

Повторное выполнение той же команды:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
 (kafka.admin.TopicCommand$)

Список тем через AdminClient:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list


Список тем через Zookeeper connect:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5

Вот мои правила ACL:

Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

Чего мне не хватает?

1 Ответ

1 голос
/ 30 сентября 2019

Я не думаю, что это имеет какое-либо отношение к AWS MSK, и это скорее проблема конфигурации вашего защищенного кластера Kafka. Как для клиентов (подписчиков / производителей), так и для действий между брокерами требуется авторизация в защищенном кластере. У вас возникла бы та же проблема в неуправляемом кластере Kafka.

Рекомендуется настроить на сервере «пользователя-суперпользователя» (я бы назвал его служебными учетными записями), а затем дать «суперпользователю». "пользовательские списки ACL, которые разрешают межброкерские взаимодействия, необходимые для вашего кластера. Точные ACL, которые вам нужны, будут варьироваться в зависимости от ваших вариантов использования и предпочтений безопасности.

В server.properties вы добавите запись, например super.users=User:BrokerService, и документально подтверждена на https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser.. документация предполагает использование Алисы и Боба в качестве имен суперпользователя, что меня смущает. Выберите любое имя пользователя, которое имеет смысл для вас.

Затем вам нужно настроить аналогичный ACL, который использует принцип имени пользователя с пользователем «superuser», которого вы создали выше, например, principal=User:BrokerService. ACL даст все необходимые разрешения брокерам. Ваш непосредственный вариант использования - РАЗРЕШИТЬ ПРОЧИТАТЬ все темы, на которые это похоже. Вероятно, вам также понадобятся другие списки ACL для взаимодействия между брокерами, но я не могу точно сказать, что вам нужно, без дополнительной информации о том, что вы хотите сделать.

Например, эта команда для настройки ACL.

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster

Дополнительные параметры для настройки ACL и описание вашей точной проблемы описаны здесь https://docs.confluent.io/current/kafka/authorization.html#acl-format

Снова, пожалуйста, изучите еще немного или отредактируйте свой вопрос, если вы ищететочная конфигурация, которую следует использовать здесь, поскольку существуют последствия для безопасности и вариантов использования для того, какие ACL вы используете.

...