MSK - это в основном кластер vanilla apache kafka, настроенный и управляемый aws (с предопределенными параметрами конфигурации, основанными на типе экземпляра кластера, количестве посредников и т. Д.), Настроенный для облачной среды.
В идеале это должно бытьспособен выполнять все / большинство вещей, которые поддерживает Kafka с открытым исходным кодом.Также, если у вас есть конкретный вариант использования или требование, которое не задокументировано, я предлагаю вам связаться со службой поддержки AWS для получения дополнительных разъяснений относительно управляемой части кластера kafka (максимально допустимое количество брокеров, надежность, стоимость).
Iпостараюсь ответить на ваши вопросы исходя из моего личного опыта:
i) Как я могу получить доступ к AWS MSK с клиентами kafka, работающими в моей локальной системе?
Выне может получить доступ к MSK напрямую с локальной или локальной машины, используя клиент kafka или поток kafka.Поскольку URL-адрес брокера, строка соединения zookeeper являются частными IP-адресами кластера msk vpc / subnet.Чтобы получить доступ через клиент kafka, вам нужно запустить экземпляр ec2 в том же vpc MsK и выполнить клиент kafka (производитель / потребитель) для доступа к кластеру msk.
Для доступа к кластеру MSK с локального компьютера или локального компьютера.В системах вы можете настроить инфраструктуру kafka Rest Proxy с открытым исходным кодом от Confluent для доступа к кластеру MSK из внешнего мира через rest api.Эта структура не является полноценным клиентом kafka и не позволяет выполнять все операции клиента kafka, но вы можете выполнять большинство операций на кластере, начиная с выборки метаданных кластера, информации о теме, создания и потребления сообщения и т. Д.
Сначала настройте группу безопасности экземпляра repo и ec2 (см. - Раздел-1: Предварительная установка или настройка - дополнительные компоненты kafka ), а затем установите / настройте оставшийся прокси kafka.
sudo yum install confluent-kafka-rest
Создайте имя файла kafka-rest.properties и добавьте следующее содержимое:
bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
zookeeper.connect=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
schema.registry.url=http://localhost:8081
** измените загрузчик и URL / ips zookeeper.
Запуск сервера отдыха
kafka-rest-start kafka-rest.properties &
Доступ к MSK через API отдыха с помощью curl или rest клиент / браузер.
Получение списка тем
curl "http://localhost:8082/topics"
curl "http://<ec2 instance public ip>:8082/topics"
Чтобы получить доступ с локального или локального компьютера, убедитесь, что к экземпляру ec2, на котором работает остальной сервер, подключен публичный ip или эластичный ip.
Больше операций API отдыха https://github.com/confluentinc/kafka-rest
ii) Поддерживает ли MSK эволюцию схемы и точную семантику?
Вы можете использовать авро-сообщение вместе с 'Реестр схемы' для достижения эволюции схемы и обслуживания схемы.
Установка и настройка реестра схемы аналогичны слиянию прокси-сервера kafka-rest.
sudo yum install confluent-schema-registry
Создание файлаНазовите schema-registry.propertie и добавьте следующее содержимое:
listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
kafkastore.bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
kafkastore.topic=_schemas
debug=false
** измените URL-адрес загрузочного загрузчика и zookeeper (соединение) / ips.
Запустите службу реестра схемы
schema-registry-start schema-registry.properties &
Для получения дополнительной информации см. https://github.com/confluentinc/schema-registry
https://docs.confluent.io/current/schema-registry/docs/schema_registry_tutorial.html
Точно, как только семантика является функцией apache kafkaи хотя я не тестировал его на msk, я считаю, что он должен поддерживать эту функцию, поскольку он является частью только Apache kafka с открытым исходным кодом.
iii) Будет ли MSK каким-либо способом обновить какой-либо кластер илинастройка конфигурации? Подобно тому, как aws glue обеспечивает изменение параметров для искрового исполнителя и памяти драйвера в их управляемой среде.
Да, параметр конфигурации можно изменить во время выполнения.Я проверил, изменив параметр retention.ms с помощью инструмента настройки kafka, и это изменение было немедленно применено к теме.Поэтому я думаю, что вы можете обновить и другие параметры, но MSK может не разрешить все изменения конфигурации, точно так же как AWS glue допускает только несколько изменений параметров конфигурации свечи, потому что разрешение изменения всех параметров пользователем может быть уязвимо для управляемой среды.
Изменить с помощью инструмента настройки kafka
kafka-configs.sh --zookeeper 10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181 --entity-type topics --entity-name jsontest --alter --add-config retention.ms=128000
Подтверждено изменение с помощью rest
curl "http://localhost:8082/topics/jsontest"
iv) Можно ли интегрировать MSK с другими сервисами AWS (например, Redshift, EMR и т. Д.)?
Да, вы можете подключиться / интегрироваться в другой сервис aws с MSK. Например, вы можете запустить клиент (потребитель) Kafka для чтения данных из kafka и записи в reddshift, rds, s3 или Dynamodb. Убедитесь, что клиент kafka работает на экземпляре ec2 (внутри msk vpc), которому назначена роль iam для доступа к этой службе, а экземпляр ec2 находится в публичной подсети или в частной подсети (с конечной точкой NAT или vpc для s3).
Также вы можете запустить EMR внутри кластеров MSK vpc / subnet, а затем через EMR (spark) вы можете подключиться к другой службе.
Потоковая структура Spark с AWS Managed Service Kafka
Запуск кластера EMR в vpc кластера MSK
Разрешите Группу безопасности EMR Master и Slave во входящем правиле группы безопасности кластеров MSK для порта 9092
Запустить Spark shell
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
Подключение к кластеру MSK из потоковой структуры искры
val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092").option("subscribe", "jsontest") .load()
Начать чтение / печать сообщения на консоли
val df=kafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").start()
или
val df=kafka.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()
v) Могу ли я использовать потоковый sql с MSK через ksql? Как я могу настроить KSQL с MSK?
Да, вы можете настроить KSQL с кластером MSK. По сути, вам нужно запустить экземпляр ec2 в той же vpc / подсети кластера MSK. А затем установите сервер ksql + клиент в экземпляре ec2 и используйте его.
Сначала настройте группу безопасности слитного репо и экземпляра ec2 (см. - Раздел-1: Предварительная установка или настройка - дополнительные компоненты kafka ), а затем установите / настройте сервер / клиент Ksql.
После этого установите сервер ksql
sudo yum install confluent-ksql
Создайте имя файла ksql-server.properties и добавьте следующий контент-
bootstrap.servers=10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092
listeners=http://localhost:8088
** изменить загрузочный сервер ips / url.
Запустить сервер ksql
ksql-server-start ksql-server.properties &
После этого запустите ksql cli
ksql http://localhost:8088
И, наконец, запустите команду, чтобы получить список тем
ksql> SHOW TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
_schemas | false | 1 | 3 | 0 | 0
jsontest | false | 1 | 3 | 1 | 1
----------------------------- --------------------------------------------------
Обратитесь за дополнительной информацией
https://github.com/confluentinc/ksql
vi) Как выполнить прогнозный анализ данных, проходящих через MSK, в режиме реального времени?
Выполнение прогностического анализа или машинного обучения в режиме реального времени на самом деле не относится к MSK. То же, что вы будете делать с кластером kafka (или любым потоковым конвейером), то же самое применимо к MSK. Существуют различные способы достижения ваших целей, но я опишу наиболее распространенный или широко используемый в отрасли:
Использование Spark с MSK (kafka) и выполнение анализа с помощью потоковой передачи структуры и MLIB (с вашей прогнозной моделью).
Вы можете обучить свою прогностическую модель в H20.ai framework, а затем экспортировать модель как java pojo. А затем интегрируйте модель java pojo с пользовательским кодом kafka, который обработает сообщение из темы msk (kafka) и выполнит анализ в реальном времени.
Вы можете обучать модели и развертывать в sagemaker, а затем вызывать из клиентского кода клиента kafka для получения прогноза в реальном времени, вызывая конечную точку вывода модели sagemaker на основе данных / сообщений kafka.
vii) Кроме того, насколько надежен MSK по сравнению с другим облачным кластером kafka из Azure / confluent и какой-либо тест производительности по сравнению с vanilla kafka? И какое максимальное количество брокеров может быть запущено в кластере?
MSK в предварительном просмотре, как вы уже знаете, поэтому пока рано говорить о его надежности. Но в целом, как и все другие сервисы AWS, он должен стать более надежным со временем, а также, надеюсь, с новыми функциями и лучшей документацией.
Я не думаю, что AWS или какой-либо облачный поставщик Azure. Облако Google предоставляет эталонный тест производительности своих сервисов, поэтому вам придется попробовать тестирование производительности с вашей стороны.А клиенты / инструменты kafka ( kafka-producer-perf-test.sh, kafka-consumer-perf-test.sh ) предоставляют сценарий тестирования производительности, который можно выполнить для достижения производительностиИдея кластера.Опять же, тестирование производительности сервиса в реальном производственном сценарии будет сильно различаться в зависимости от различных факторов, таких как (размер сообщения, объем данных, поступающих на kafka, производитель синхронизации или асинхронный вызов, количество потребителей и т. Д.) И производительность будет снижаться до определенного уровня.случай использования, а не общий тест.
Относительно максимального числа брокеров, поддерживаемых в кластере, лучше спросить ребят из AWS через их систему поддержки.
Section-1: Предварительная установка или настройка - дополнительные компоненты kafka:
Запуск экземпляра Ec2 в vpc / подсети кластера MSK.
Вход в экземпляр ec2
Установитьup yum repo для загрузки пакетов компонентов kafka с использованием yum
sudo yum install curl which
sudo rpm --import https://packages.confluent.io/rpm/5.1/archive.key
Перейдите в /etc/yum.repos.d/ и создайте файл с именем confluent.repo и добавьте следующее содержимое
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
Следующее чистое репозиторий yum
sudo yum clean all
Разрешить группу безопасности экземпляра ec2 во входящих правилах группы безопасности кластеров MSK для порта 9092(соединяющий посредник) и 2081 (соединяющий zookeeper).
Section-2: Команда для получения брокера MSK и информации о URL / ip zookeeper
URL-порт соединения Zookeeper
aws kafka describe-cluster --region us-east-1 --cluster-arn <cluster arn>
URL-адрес соединения с брокером
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn <cluster arn>
----------------------------------------------------------------------
Примечание:
Обзор MSK и настройки компонентов:
Пожалуйста, ознакомьтесь с высокоуровневой архитектурой MSK и настройкой различных компонентов (отдых, реестр схем, балансировка нагрузки и т. Д.).Также, как это будет связано с другими сервисами AWS.Это всего лишь простая эталонная архитектура.
Кроме того, вместо настройки rest, реестра схемы и ksql в экземпляре ec2 вы также можете докеризировать внутри контейнера.
И если вы настраиваете несколько прокси-серверов отдыха, то вам нужно поместить эту службу прокси-сервера за липким балансировщиком нагрузки (например, nginx, использующий хэш ip), чтобы убедиться, что один и тот же клиентский клиент сопоставляется с той же группой потребителей, чтобыизбегайте несоответствия / несоответствия при загрузке данных при чтении данных.
Надеюсь, вы найдете вышеуказанную информацию полезной !!