Как создать Datalake, используя Apache Kafka, Amazon Glue и Amazon S3? - PullRequest
0 голосов
/ 05 октября 2018

Я хочу сохранить все данные из темы Кафки в Amazon S3.У меня есть кластер Kafka, который получает в одной теме 200 000 сообщений в секунду, и каждое сообщение со значением имеет 50 полей (строки, временные метки, целые числа и числа с плавающей запятой).

Моя основная идея заключается в использовании Kafka Connector для храненияданные в контейнере s3 и после этого используйте Amazon Glue для преобразования данных и сохранения их в другом контейнере.У меня есть следующие вопросы:

1) Как это сделать?Эта архитектура будет хорошо работать?Я пытался с Amazon EMR (Spark Streaming), но у меня было слишком много проблем Как уменьшить время обработки и сбой задач с использованием Apache Spark для потоковой передачи событий от Apache Kafka?

2) Я пыталсяиспользовать Kafka Connect из Confluent, но у меня есть несколько вопросов:

  • Можно ли подключиться к моему кластеру Kafka из другого экземпляра Kafka и запустить автономно мой Kafka Connector s3?

  • Что означает эта ошибка «Ошибка задачи s3-sink-0 вызвала необнаруженное
    неисправимое исключение»?

ОШИБКА Задача s3-sink-0 вызвала необработанное и неустранимое исключение (org.apache.kafka.connect.runtime.WorkerTask: 142) java.lang.NullPointerException при io.confluent.connect.hdfs.HdfsSinkTask.close (HdfsSinkTask.java:122) в org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets (WorkerSinkTask.java:290) в org.apache.orkunta.closePartitions (WorkerSinkTask.java:421) в org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:146) в org.apache.kafka.connect.runtime.WorkerTask.T1kj) в org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175) в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) в java.util.concurrent.FutureTask.выполнить (FutureTask.java:266) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.javaThlanutor at javaTh6g at at).run (Thread.java: 745) [2018-10-05 15: 32: 26,086] ОШИБКА Задача уничтожается и не будет восстановлена ​​до перезапуска вручную (org.apache.kafka.connect.runtime.WorkerTask: 143) [2018-10-05 15:32: 27,980] WARN не удалось создать Dir, используя каталог из файла url: / targ.пропуская.(org.reflections.Reflections: 104) java.lang.NullPointerException в org.reflections.vfs.Vfs $ DefaultUrlTypes $ 3.matches (Vfs.java:239) в org.reflections.vfs.Vfs.fromURL (Vfs.java:98)) в org.reflections.vfs.Vfs.fromURL (Vfs.java:91) в org.reflections.Reflections.scan (Reflections.java:237) в org.reflections.Reflections.scan (Reflections.java:204) в org.reflections.Reflections. (Reflections.java:129) в org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins (AbstractHerder.java:268) в org.apache.kafka.connect.runtime.AbstractHerder $ 1.run (AbstractHerer.java: 377) at java.lang.Thread.run (Thread.java:745) [2018-10-05 15: 32: 27,981] WARN не удалось создать Vfs.Dir из URL.игнорирование исключения и продолжение (org.reflections.Reflections: 208) org.reflections.ReflectionsException: не удалось создать Vfs.Dir из URL-адреса, не найдено соответствующего UrlType [файл: / targ], либо использовать fromURL (окончательный URL-адрес, окончательный список)urlTypes) или используйте статический setDefaultURLTypes (окончательный список urlTypes) или addDefaultURLTypes (UrlType urlType) с вашим специализированным UrlType.в org.reflections.vfs.Vfs.fromURL (Vfs.java:109) в org.reflections.vfs.Vfs.fromURL (Vfs.java:91) в org.reflections.Reflections.scan (Reflections.java:237) вorg.reflections.Reflections.scan (Reflections.java:204) в org.reflections.Reflections. (Reflections.java:129) в org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins (AbstractHerder.java:268) вorg.apache.kafka.connect.runtime.AbstractHerder $ 1.run (AbstractHerder.java:377)at java.lang.Thread.run (Thread.java:745) [2018-10-05 15: 32: 35,441] INFO Reflections потребовалось 12393 мс для сканирования 429 URL-адресов, что позволило получить 13521 ключ и 95814 значений (org.reflections.Reflections:229)

  • Если вы можете возобновить шаги для подключения к Kafka и продолжить на s3 с
    другого экземпляра Kafka, как вы будете это делать?
  • Что значит всеэти поля key.converter, value.converter, key.converter.schemas.enable, value.converter.schemas.enable, internal.key.converter, internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable?

Каковы возможные значения для key.converter, value.converter?

3) Как только мои исходные данные будут в корзине, я быкак использовать Amazon Glue для получения этих данных, десериализации Protobuffer, изменения формата некоторых полей и, наконец, для сохранения их в другом контейнере в Parquet.Как я могу использовать свою собственную библиотеку java protobuffer в Amazon Glue?

4) Если я хочу сделать запрос с помощью Amazon Athena, как я могу автоматически загрузить разделы (год, месяц, день, час)?С гусеницами и планировщиками Amazon Glue?

Ответы [ 2 ]

0 голосов
/ 02 января 2019

В дополнение к ответу @ cricket_007

Могу ли я подключиться к своему кластеру Kafka из другого экземпляра Kafka и запустить автономно мой Kafka Connector s3?

Kafka S3Connector является частью дистрибутива Confluent, который также включает в себя Kafka, а также другие связанные службы, но он не предназначен для непосредственного запуска на ваших брокерах, а скорее:

  • в качестве отдельного работника, работающего с Connector.конфигурация указывается при запуске службы
  • или в качестве дополнительного рабочего кластера, работающего на стороне вашего кластера Kafka Brokers.В этом случае взаимодействие / запуск соединителей лучше с помощью API-интерфейса Kafka Connect REST (поиск «Управление соединителями Kafka» для документации с примерами)

Если вы можете возобновить шаги для подключенияв Кафку и продолжать s3 из другого экземпляра Кафки, как вы будете?

Вы говорите о другом экземпляре Кафки Connect ?

  • , еслиТаким образом, вы можете просто запустить службу Kafka Connect в распределенном режиме, который должен был обеспечить надежность, которую вы, похоже, ищете ...

Или вы имеете в виду другого Kafka (брокера) кластер ?

  • в этом случае вы можете попробовать (но это было бы экспериментально, а я сам не пробовал ...) запустить Kafka Connect в автономном режиме и просто обновитьbootstrap.servers параметр конфигурации вашего соединителя, указывающий на новый кластер.Почему это может сработать: в автономном режиме смещения разъема (ов) приемника хранятся локально на вашем рабочем месте (в отличие от распределенного режима, где смещения хранятся непосредственно в кластере Kafka ...).Почему это может не сработать: оно просто не предназначено для этого использования, и я предполагаю, что вам могут понадобиться, чтобы ваши темы и разделы были абсолютно одинаковыми ...?

Каковы возможныезначения для key.converter, value.converter?

Проверьте Документация Confluent для kafka-connect-s3 ;)

Как использовать мойсобственная библиотека java protobuffer в Amazon Glue?

Не уверен в реальном методе, но задания Glue порождают кластер EMR за кулисами, поэтому я не понимаю, почему это не должно быть возможно ...

Если я хочу сделать запрос в Amazon Athena, как я могу автоматически загрузить разделы (год, месяц, день, час)?С гусеницами и планировщиками Amazon Glue?

Да.

Предполагая ежедневное разбиение разделов, вы действительно можете запланировать запуск сканера по утрам, как можно скорее.поскольку вы можете ожидать, что новые данные создали папку этого дня на S3 (поэтому на S3 существует по крайней мере один объект для этого дня) ... Сканер добавит раздел этого дня, который затем будет доступен для запросов с любым вновь добавленным объектом.

0 голосов
/ 05 октября 2018

Мы используем S3 Connect для сотен тем и обрабатываем данные, используя Hive, Athena, Spark, Presto и т. Д. Кажется, все работает нормально, хотя я чувствую, что реальная база данных может возвращать результаты быстрее.

В любом случае, чтобы ответить о Connect

Могу ли я подключиться к своему кластеру Kafka из другого экземпляра Kafka и запустить автономно мой Kafka Connector s3?

Я не уверен, что понимаю вопрос, но Kafka Connect необходимо подключиться к одному кластеру, вам не нужны два кластера Kafka, чтобы использовать его.Обычно вы запускаете процессы Kafka Connect как часть их собственного кластера, а не посредников.

Что означает эта ошибка «Ошибка задачи s3-sink-0 вызвала необнаруженное неисправимое исключение»?

Это означает, что вам нужно просмотреть журналы, чтобы выяснить,какое исключение выбрасывается и мешает соединителю читать данные.

WARN could not create Dir using directory from url file:/targ ... Если вы используете разъем HDFS, я не думаю, что вы должны использовать файл по умолчанию: // URI

Если вы можете возобновитьшаги для подключения к Kafka и продолжения s3 с другого экземпляра Kafka, как вы будете это делать?

Вы не можете "возобновить работу с другого экземпляра Kafka".Как уже упоминалось, Connect может использовать только один кластер Kafka, и любые использованные смещения и группы потребителей сохраняются вместе с ним.

Что означают все эти поля

Эти поля удалены из последних выпусков Kafka, их можно игнорировать.Вам определенно не следует менять их

internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable

Это ваши сериализаторы и десериализаторы, как у обычного API-интерфейса для производителей

key.converter, value.converter

Iсчитаю, что это важно только для JSON-конвертеров.См. https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas-enable-requires-schema-and-payload-fields

key.converter.schemas.enable, value.converter.schemas.enable

, чтобы десериализовать Protobuf, изменить формат некоторых полей и, наконец, сохранить его в другом контейнере в Parquet

Kafka Connect должен быть загружен конвертером Protobuf, и я не знаю, есть ли он (я думаю, что Blue Apron что-то написал ... Search github).

Вообще говоря, Avro будетгораздо проще конвертировать в паркет, потому что для этого уже существуют нативные библиотеки.S3 Connect by Confluent в настоящее время не пишет формат Паркет, но существует в открытом PR.Альтернативой является использование библиотеки Pinterest Secor .

Я не знаю Glue, но если это похоже на Hive, вы будете использовать ADD JAR во время запроса для загрузки внешних плагинов и функций кода

У меня минимальный опыт работы с Athena, но Glueподдерживает все разделы в качестве метастафа Hive.Автоматическая часть была бы сканером, вы можете поставить фильтр на запрос, чтобы сделать сокращение раздела

...