ksql - создание потока из массива json - PullRequest
0 голосов
/ 26 июня 2018

Моя тема кафки состоит в передаче данных в этом формате (начиная с collectd ):

[{"values":[100.000080140372],"dstypes":["derive"],"dsnames":["value"],"time":1529970061.145,"interval":10.000,"host":"k5.orch","plugin":"cpu","plugin_instance":"23","type":"cpu","type_instance":"idle","meta":{"network:received":true}}]

Это комбинация массивов, целых и плавающих чисел ... и все этовнутри массива JSON.В результате я чертовски потратил время, используя ksql , чтобы сделать что-нибудь с этими данными.

Когда я создаю поток «по умолчанию» как

create stream cd_temp with (kafka_topic='ctd_test', value_format='json');

, я получаю такой результат:

ksql> describe cd_temp;

 Field   | Type                      
-------------------------------------
 ROWTIME | BIGINT           (system) 
 ROWKEY  | VARCHAR(STRING)  (system) 
-------------------------------------

Любой выберите вернет ПУТЬ ВРЕМЕНИи 8-значное шестнадцатеричное значение для ROWKEY.

Я потратил некоторое время, пытаясь извлечь поля JSON безрезультатно.Меня беспокоит следующее:

ksql> print 'ctd_test' from beginning;
Format:JSON
com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode

Возможно ли, что эту тему нельзя использовать в ksql ?Есть ли способ распаковать внешний массив, чтобы добраться до интересных бит внутри?

1 Ответ

0 голосов
/ 27 июня 2018

На момент написания (июнь 2018 г.) KSQL не мог обработать сообщение JSON, где все это встроено в массив верхнего уровня.Для отслеживания этой существует проблема github.Я бы предложил добавить +1 голос по этому вопросу, чтобы повысить его приоритетность.

Кроме того, я заметил, что ваш оператор создания потока не определяет схему сообщения json.Хотя это не поможет в этой ситуации, это то, что вам нужно для других форматов ввода Json, т.е. вы создаете оператор должен быть что-то вроде:

create stream cd_temp (values ARRAY<DOUBLE>, dstypes ARRAY<VARCHAR>, etc) with (kafka_topic='ctd_test', value_format='json');
...