Kafka Stream Ksql Json - PullRequest
       103

Kafka Stream Ksql Json

2 голосов
/ 03 июля 2019

Кафка стрим / Ksql на самом деле как-то изначально поддерживает json?Какие другие форматы поддерживаются?Я видел, что возможно, чтобы плоский json интерпретировался как таблица.Я хочу понять эту часть немного лучше;Каковы другие форматы, которые kafka-потоки через Ksql, которые могут быть запрошены через SQL?Как это возможно или поддерживается?Что за нативная поддержка?

1 Ответ

3 голосов
/ 03 июля 2019

KSQL

Для форматов значений KSQL поддерживает AVRO, JSON и DELIMITED (например, CSV).

Документацию можно найти здесь:

Потоки Кафки

Kafka Streams поставляется с некоторыми примитивными / базовыми SerD (сериализаторами / десериализаторами) в пакете org.apache.kafka.common.serialization.

Документацию можно найти здесь:

enter image description here

Confluent также предоставляет Avro SerDes, совместимых с реестром схем для данных в стандартном Avro и в определенном формате Avro. Вы можете найти документацию здесь:

Вы также можете использовать базовую реализацию SerDe для JSON , которая поставляется с примерами:

В крайнем случае, вы всегда можете создать свой собственный SerDes . Для этого вы должны:

  1. Напишите сериализатор для вашего типа данных T, реализовав org.apache.kafka.common.serialization.Serializer.
  2. Написать десериализатор для T путем реализации org.apache.kafka.common.serialization.Deserializer.
  3. Написать serde для T, выполнив org.apache.kafka.common.serialization.Serde, что вы либо делаете вручную (см. существующие SerDes в предыдущем разделе) или используя вспомогательные функции в Serdes, такие как Serdes.serdeFrom(Serializer<T>, Deserializer<T>). Обратите внимание, что вы будете необходимо реализовать свой собственный класс (который не имеет общих типов), если вы хотите использовать свой пользовательский serde в конфигурации, предоставленной для KafkaStreams. Если ваш класс serde имеет общие типы или вы используете Serdes.serdeFrom(Serializer<T>, Deserializer<T>), вы можете передать serde только через вызовы методов (например, builder.stream("topicName", Consumed.with(...))).
...