Лучший способ сделать это - использовать Avro, в которой схема хранится отдельно и автоматически используется Kafka Connect и KSQL.
Вы можете использовать Avro, настроив Kafka Connect для использования AvroConverter.В вашем рабочем наборе конфигурации Kafka Connect:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
( Обновите schema-registry
до имени хоста, на котором работает реестр схем )
Оттуда, в KSQLвы просто используете
CREATE STREAM my_stream WITH (KAFKA_TOPIC='source_topic', VALUE_FORMAT='AVRO');
Здесь вам не нужно указывать саму схему, поскольку KSQL извлекает ее из реестра схем.
Подробнее о конвертерах и сериализаторах можно прочитать здесь .
Отказ от ответственности: я работаю в Confluent и написал упомянутое сообщение в блоге.