Да, вы можете сделать это - KSQL не возражает, если столбец не существует, вы просто получаете значение null
.
Настройка тестовых данных
Заполните некоторые тестовые данные в теме:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
Вывести тему в консоль KSQL для проверки:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
Смоделируйте поток данных источника
Создайте поток поверх него.Обратите внимание на использование STRUCT
и ссылку на каждый возможный столбец:
CREATE STREAM T (TYPE VARCHAR, \
DATA STRUCT< \
TS VARCHAR, \
A INT, \
B VARCHAR, \
C INT, \
D VARCHAR>) \
WITH (KAFKA_TOPIC='t_raw',\
VALUE_FORMAT='JSON');
Установите смещение на самое раннее, чтобы мы запросили всю тему, а затем используйте KSQL для доступа к полному потоку:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
Запросите типы по отдельности, используя оператор ->
для доступа к вложенным элементам:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
Сохраните данные в отдельных темах Kafka:
Заполните целевые темы разделеннымиданные:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
Схема для новых потоков:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
Темы лежат в основе каждого потока KSQL:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------