Строковое поле в KSQL TABLE или STREAM, содержащее часть исходного сообщения JSON - PullRequest
0 голосов
/ 01 февраля 2019

Можно ли добавить строковое поле в таблицу / поток KSQL, которое будет содержать часть JSON исходного сообщения.

Например,

Исходное сообщение:

{userId:12345, 
 service:"service-1", 
 "debug":{
          "msg":"Debug message", 
          "timer": 11.12}
}

Итак, нам нужно сопоставить userId с userId BIGINT, service с service STRING и debug с debug STRING, которые будут содержать {"msg":"Debug message", "timer": 11.12} в виде строки.

1 Ответ

0 голосов
/ 04 февраля 2019

Да, вы можете просто объявить его как VARCHAR.Оттуда вы можете рассматривать его как просто строку, которая оказывается JSON, или вы можете манипулировать ею далее с помощью функции EXTRACTJSONFIELD.

Отправить образец сообщения в тему:

echo '{"userId":12345, "service":"service-1", "debug":{ "msg":"Debug message", "timer": 11.12} }' | kafkacat -b localhost:9092 -t test_topic -P

Объявить поток:

ksql> CREATE STREAM demo (userid BIGINT, service VARCHAR, debug VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Запросить столбцы:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT USERID, SERVICE, DEBUG FROM demo;
12345 | service-1 | {"msg":"Debug message","timer":11.12}

Доступ к вложенному JSONполя:

ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.msg') FROM demo;
12345 | service-1 | Debug message

ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.timer') FROM demo;
12345 | service-1 | 11.12
...