списки переменной длины в ksql - PullRequest
0 голосов
/ 10 июня 2018

В KSQL можно использовать EXTRACTJSONFIELD для вложенных структур, но я не понимаю, как обращаться со списками переменной длины.Например:

{"id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}

Я могу иметь дело с quux в качестве varchar для основного потока,

create stream mystream (id bigint, quux varchar)
with (kafka_topic='mytopic', value_format='json')

, но я хотел бы иметь возможность превратить это в таблицус:

quuxid x y
1      1 2
1      3 4
1      5 6

Как мне работать со списками переменной длины в KSQL?

1 Ответ

0 голосов
/ 12 июня 2018

В настоящее время это невозможно в KSQL.

Как вы уже видели, вы можете получить доступ к данным массива по индексу, в частности: Заполните тестовые данные:

echo '{"id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}' | \
kafkacat -b localhost:9092 -t quux

Проверьте сообщение в KSQL:

ksql> print 'quux' from beginning;
Format:JSON
{"ROWTIME":1528791985250,"ROWKEY":"null","id":1,"quux":[{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]}

Создатьпоток:

create stream mystream (id bigint, quux varchar) \
with (kafka_topic='quux', value_format='json');

Поток запроса:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'
ksql>
ksql> select * from mystream;
1528791985250 | null | 1 | [{"x":1,"y":2},{"x":3,"y":4},{"x":5,"y":6}]

Создание потока с помощью array:

ksql> CREATE STREAM mystream2 (id bigint, quux array<varchar>) with (kafka_topic='quux', value_format='json');

 Message
----------------
 Stream created
----------------

Доступ к отдельным элементам по индексу:

ksql> SELECT quux[0], EXTRACTJSONFIELD(quux[0],'$.x') AS X, EXTRACTJSONFIELD(quux[0],'$.y') AS Y from mystream2;
{"x":1,"y":2} | 1 | 2

Но вы ищете эквивалент EXPLODE функции, которой еще нет в KSQL.

Проблемы с github:

...