Использование KSQL-CLI с темой kafka, сообщения которой являются объектами JSON.Я хочу извлечь поля, такие как obj.updaterId, не объявляя исчерпывающих определений полей STRUCT или MAP.
{
"id": "5ba8f7e6b93c7964efb00f48",
"source": "ShareService",
"obj": {
"updaterId": "systems@test.com",
"desc": "foobar",
"name": "com.test.auto.sensor"
}
}
Я могу успешно создать поток многими способами, самым простым из которых является:
CREATE STREAM objs1 (obj VARCHAR) WITH (kafka_topic='json-topic', value_format='JSON');
Простой выбор работает, как и ожидалось, вы можете увидеть содержимое obj ...
SELECT * FROM objs1;
1537804190394 |"5ba8f7e6b93c7964efb00f48" |{name = com.test.auto.sensor, updaterId=systems@test.com, desc = foobar}
Здесь не работает любая попытка извлечь поле JSON из obj с использованием EXTRACTJSONFIELD,Ответ как для объектов верхнего уровня, так и для вложенных объектов - 'null'.
SELECT EXTRACTJSONFIELD(obj, '$.updaterId') AS updater FROM objs1;
null
В есть документация ksql это говорит о том, что если данные являются фактическим объектом в столбце STRING, я мог бы вместо этого использовать STRUCT.Это не говорит о том, что я ДОЛЖЕН использовать STRUCT.
Кстати, использование STRUCT работает, но я заинтересован в EXTRACTJSONFIELD, потому что глубокие структуры моих сообщений будут различаться.Другими словами, иногда ожидается нулевой ответ, если сообщения не содержат глубокую структуру.
Работы:
CREATE STREAM objs1 (obj STRUCT<updaterId VARCHAR>) WITH (kafka_topic='json-topic', value_format='JSON');
SELECT OBJ->updaterId AS updater FROM OBJS1;
Клянусь, я вижу другие примеры в вопросах других людей, которые, кажется, работают для подобных договоренностей.Чего мне не хватает?
Примечание. Я упростил свой JSON для этого поста.Это более крупный и более вложенный IRL, но, как мне кажется, этот более простой пример точен.
KSQL версии 5.0.0 для OSX.