Определите KSQL STRUCT для JSON-значимой темы с разными типами - PullRequest
0 голосов
/ 23 ноября 2018

( Редактировать : небольшие правки для лучшего отражения намерения, но большие правки из-за достигнутого прогресса.)

Тема "t_raw" содержит сообщения нескольких типов, где все они содержатобщий ключ "type":

{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}

В конечном счете, мне нужно разделить это на другие потоки, где они будут разделены / агрегированы / обработаны.Я хотел бы иметь возможность использовать STRUCT для всего, но мои текущие усилия заставили меня сделать это:

create stream raw (type varchar, data varchar) \
  with (kafka_topic='t_raw', value_format='JSON');

для первого уровня, затем

create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
  select \
    extractjsonfield(data, '$.ts') as ts, \
    extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
  from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
  select \
    extractjsonfield(data, '$.ts') as ts, \
    extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
    extractjsonfield(data, '$.d') as d \
  from raw where type='key2';

Это кажетсяработать, но с недавним добавлением STRUCT, есть ли способ использовать его вместо extractjsonfield, как сделано выше?

ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2

Если не с STRUCT, есть ли прямая-передний способ сделать это с помощью vanilla kafka-streams (тиски ksql, т. е. тег )?

Есть ли более кафка-эск / эффективный / элегантныйспособ разобрать это?Я не могу определить его как пустое STRUCT<>

ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) \
      WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}

Существует некоторое (не совсем недавнее) обсуждение о возможности сделать что-то вроде

CREATE STREAM key1 ( a INT, b VARCHAR ) AS \
  SELECT data->* from some_input where type = 'key1';

К вашему сведению: вышеприведенное решение не будет работать в confluent-5.0.0, , в недавнем патче исправлена ​​ошибка extractjsonfield и включено это решение.

В реальных данных есть еще несколько похожихтипы сообщений.Все они содержат ключи "type" и "data" (и никаких других на верхнем уровне), и почти все имеют эквивалент "ts" метки времени, вложенный в "data".

1 Ответ

0 голосов
/ 23 ноября 2018

Да, вы можете сделать это - KSQL не возражает, если столбец не существует, вы просто получаете значение null.

Настройка тестовых данных

Заполните некоторые тестовые данные в теме:

kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF

Вывести тему в консоль KSQL для проверки:

ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>

Смоделируйте поток данных источника

Создайте поток поверх него.Обратите внимание на использование STRUCT и ссылку на каждый возможный столбец:

CREATE STREAM T (TYPE VARCHAR, \
                 DATA STRUCT< \
                      TS VARCHAR, \
                      A INT, \
                      B VARCHAR, \
                      C INT, \
                      D VARCHAR>) \
        WITH (KAFKA_TOPIC='t_raw',\
              VALUE_FORMAT='JSON');

Установите смещение на самое раннее, чтобы мы запросили всю тему, а затем используйте KSQL для доступа к полному потоку:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated

Запросите типы по отдельности, используя оператор -> для доступа к вложенным элементам:

ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
1 | hello
2 | hello2

ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2

Сохраните данные в отдельных темах Kafka:

Заполните целевые темы разделеннымиданные:

ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';

Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';

Message
----------------------------
Stream created and running
----------------------------

Схема для новых потоков:

ksql> DESCRIBE TYPE_1;

Name                 : TYPE_1
Field    | Type
--------------------------------------
ROWTIME  | BIGINT           (system)
ROWKEY   | VARCHAR(STRING)  (system)
DATA__TS | VARCHAR(STRING)
DATA__A  | INTEGER
DATA__B  | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;

Name                 : TYPE_2
Field    | Type
--------------------------------------
ROWTIME  | BIGINT           (system)
ROWKEY   | VARCHAR(STRING)  (system)
DATA__TS | VARCHAR(STRING)
DATA__A  | INTEGER
DATA__C  | INTEGER
DATA__D  | VARCHAR(STRING)
--------------------------------------

Темы лежат в основе каждого потока KSQL:

ksql> LIST TOPICS;

Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw                       | true       | 1          | 1                  | 2         | 2
TYPE_1                      | true       | 4          | 1                  | 0         | 0
TYPE_2                      | true       | 4          | 1                  | 0         | 0
---------------------------------------------------------------------------------------------------------
...