Обработка Avro в ksqlDB без реестра схем - PullRequest
0 голосов
/ 05 мая 2020

Я работаю с развертыванием Kafka, которое не с использованием любого реестра схем .
События во всех темах: Avro. У меня есть доступ к схеме, она просто не хранится в Schema Registry.

Теперь в документации ksqlDB явно указано , что вам нужен реестр схем, чтобы иметь возможность обрабатывать события Avro :

Стеки с реестром схем могут использовать события в кодировке Avro и Protobuf в приложениях ksqlDB. Без реестра схемы ваши приложения ksqlDB могут использовать только JSON или форматы с разделителями.

Я ищу обходной путь к этой проблеме, чтобы иметь возможность определять потоки и таблицы ksqlDB в этих темах событий Avro. У меня есть несколько идей, и я хотел бы знать, верны ли они или есть другое решение

  • Используйте Kafka API (потребитель + производитель или потоковый API) для преобразования в новые темы, от Avro до Json.
  • Используйте коннектор Kafka Connect для преобразования. Большинство разъемов подключаются к Kafka или из него. Идея здесь в том, чтобы подключиться от одного Kafka topi c к другому. Не уверен, возможно ли
  • Создайте какое-то другое определение, которое позволит ksqlDB анализировать события Avro (возможно, явно определить схему потока ksqlDB SQL)

Хотелось бы знать ff есть конкретный c пример реализации любого из них.

1 Ответ

2 голосов
/ 06 мая 2020

В этой ситуации использование реестра схем настоятельно рекомендуется . Это автономное развертывание JVM (как и ksqlDB) с той же лицензией (Confluent Community License), что и ksqlDB. Фактически, на pu sh вы даже можете запустить его на той же машине, что и ksqlDB.

Помимо того факта, что запускать реестр схем проще, чем приведенное ниже решение, отказавшись от использования реестра схем, вы теряете все его преимущества (проверки совместимости, слабая связь между производителем и потребителем и т. Д.) 1017 * et c)


Но, если вы абсолютно не можете запустить реестр схем, то вашей отправной точкой является понимание того, как Avro, установленный на вашем Kafka topi c, был сериализован. Если это было с сериализатором реестра схем (который обычно используется для данных Avro по темам Kafka), вам необходимо сначала десериализовать его с помощью десериализатора реестра схем, доступного для клиентских библиотек (например, Java). Если это просто двоичный Avro с отдельным avs c (что, похоже, может быть), вы просто читаете его как это.

Если вы хотите использовать данные с ksqlDB, то я думаю, что ваше предложение о повторной сериализации до JSON, вероятно, будет лучшим выбором.

Таким образом, у вас должен быть пользовательский потребитель для чтения Avro, десериализации с использованием схемы, которую вы храните, а затем пользовательский производитель для записи в новый Kafka topi c как JSON.

Оттуда вы можете использовать ksqlDB для чтения данных JSON, только с дополнительным раздражением из-за необходимости вручную вводить схему, которая у вас уже есть в Avro;)

CREATE STREAM my_source (COL1 VARCHAR, 
                         COL2 BIGINT) 
  WITH (KAFKA_TOPIC='my_json_topic', 
        VALUE_FORMAT='JSON');
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...