Как передать структурированные записи прямо из K SQL в коннектор (например, InfluxDB) - PullRequest
1 голос
/ 29 мая 2020

Я пытаюсь передать данные напрямую из K SQL в InfluxDB (или любой другой коннектор, требующий определений). Я могу заставить все работать в простом случае, но у меня возникают проблемы, когда схема требует сложных типов. (То есть, теги для InfuxDB).

Вот пример моего потока / схемы:

 Field    | Type                                                   
-------------------------------------------------------------------
 ROWKEY   | VARCHAR(STRING)  (primary key)
 FIELD_1  | VARCHAR(STRING)                                        
 FIELD_2  | VARCHAR(STRING)                                        
 FIELD_3  | VARCHAR(STRING)                                        
 FIELD_4  | DOUBLE                                                 
 TAGS     | MAP<STRING, VARCHAR(STRING)> 

Если я вручную создаю схему AVRO и заполняю записи от простого производителя, я могу прочтите руководство по началу работы здесь и вставьте теги для InfluxDB.

Однако, когда я перейду на K SQL, если я попытаюсь напрямую потопить поток AVRO в InfluxDB, я теряю информацию о сложных типах (тегах). Я заметил предупреждение из этого сообщения в блоге : «Предупреждение ksqlDB / K SQL еще не может записывать данные в формате Avro, совместимом с этим коннектором»

Затем я пытаюсь преобразовать Поток AVRO в формат JSON, но теперь я понимаю, что мне придется указывать схему в каждой записи, подобно тому, что задает этот вопрос . Мне не удалось преобразовать поток AVRO в поток JSON и одновременно встроить схему и полезную нагрузку.

Наконец, я вижу «решение для покачивания» с kafkacat, но это заставило бы меня выгружать записи из K SQL в kafkacat, а затем обратно в Kafka, прежде чем, наконец, добраться до Influx.

Есть ли способ загружать сложные записи прямо из K SQL в формате JSON или AVRO в разъем?

1 Ответ

1 голос
/ 02 июня 2020

Я бы предположил, что причина, по которой ksqlDB еще не может выводить данные AVRO в формате, требуемом InfluxDB, заключается в том, что он не выводит поле TAGS как тип Avro map из-за того, что карты Avro требуют ненулевого значения key и тип SQL MAP<STRING, STRING>, разрешающий пустые ключи. Следовательно, ksqlDB сериализует карту как Avro array записей ключ-значение.

Чтобы что-то работало с Avro, вам понадобится:

  1. Поддержка ненулевых типов : https://github.com/confluentinc/ksql/issues/4436 или
  2. Поддержка использования существующей схемы Avro: https://github.com/confluentinc/ksql/issues/3634

Пожалуйста, проголосуйте за / прокомментируйте эти проблемы, чтобы повысить их профили.

Раньше решение на основе JSON не работало, потому что, как вы указали, соединитель требует схемы JSON, встроенной в полезную нагрузку. Однако самая последняя версия Confluent Platform / Schema Registry поддерживает схемы JSON в реестре схем. Следовательно, хотя я еще не пробовал, обновление до последней версии CP может означать, что решение на основе JSON будет работать. Если нет, вероятно, стоит поднять заявку на Jira / Github, чтобы обновить соответствующий компонент, чтобы это работало.

...