Как создать таблицу K SQL из топи c с помощью составного ключа? - PullRequest
0 голосов
/ 08 января 2020

У меня есть некоторые данные topi c с полями stringA stringB, и я просто пытаюсь использовать их в качестве ключа при создании таблицы K SQL из топи c.

1 Ответ

0 голосов
/ 09 января 2020

Вот пример. Сначала я создам и заполню тестовый поток

ksql> CREATE STREAM TEST (STRINGA VARCHAR, 
                          STRINGB VARCHAR, 
                          COL3 INT) 
                    WITH (KAFKA_TOPIC='TEST',
                          PARTITIONS=1,
                          VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',1);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('A','B',2);
ksql> INSERT INTO TEST (STRINGA, STRINGB, COL3) VALUES ('C','D',3);
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> SELECT * FROM TEST EMIT CHANGES LIMIT 3;
+--------------+--------+---------+----------+------+
|ROWTIME       |ROWKEY  |STRINGA  |STRINGB   |COL3  |
+--------------+--------+---------+----------+------+
|1578569329184 |null    |A        |B         |1     |
|1578569331653 |null    |A        |B         |2     |
|1578569339177 |null    |C        |D         |3     |

Обратите внимание, что ROWKEY равно нулю.

Теперь я создам новый поток, заполненный первым, и создам составной столбец, установив его в качестве ключа. Я также включаю сами исходные поля, но это необязательно, если они вам не нужны:

ksql> CREATE STREAM TEST_REKEY AS 
        SELECT STRINGA + STRINGB AS MY_COMPOSITE_KEY, 
               STRINGA, 
               STRINGB, 
               COL3 
          FROM TEST 
          PARTITION BY MY_COMPOSITE_KEY ;

 Message
------------------------------------------------------------------------------------------
 Stream TEST_REKEY created and running. Created by query with query ID: CSAS_TEST_REKEY_9
------------------------------------------------------------------------------------------

Теперь у вас есть поток данных с ключом, установленным на ваш составной ключ:

ksql> SELECT ROWKEY , COL3 FROM TEST_REKEY EMIT CHANGES LIMIT 3;
+---------+-------+
|ROWKEY   |COL3   |
+---------+-------+
|AB       |1      |
|AB       |2      |
|CD       |3      |
Limit Reached
Query terminated

Вы также можете проверить базовую топику Кафки c, чтобы проверить ключ:

ksql> PRINT TEST_REKEY LIMIT 3;
Format:JSON
{"ROWTIME":1578569329184,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":1}
{"ROWTIME":1578569331653,"ROWKEY":"AB","MY_COMPOSITE_KEY":"AB","STRINGA":"A","STRINGB":"B","COL3":2}
{"ROWTIME":1578569339177,"ROWKEY":"CD","MY_COMPOSITE_KEY":"CD","STRINGA":"C","STRINGB":"D","COL3":3}
ksql>

После этого мы можем объявить таблицу поверх перекодированных топи c:

CREATE TABLE TEST_TABLE (ROWKEY VARCHAR KEY, 
                         COL3 INT) 
    WITH (KAFKA_TOPIC='TEST_REKEY', VALUE_FORMAT='JSON');

Из этой таблицы мы можем запросить состояние. Обратите внимание, что составной ключ AB показывает только последнее значение, которое является частью семантики таблицы (сравните с потоком выше, в котором вы видите оба значения - и поток, и таблица - это одно и то же Kafka topi c) :

ksql> SELECT * FROM TEST_TABLE EMIT CHANGES;
+----------------+---------+------+
|ROWTIME         |ROWKEY   |COL3  |
+----------------+---------+------+
|1578569331653   |AB       |2     |
|1578569339177   |CD       |3     |
...