Сохраните последнюю запись в соответствии с составным ключом. ksqlDB 0.6.0 - PullRequest
0 голосов
/ 24 января 2020

У меня есть топик Кафки c со следующим потоком данных (ksqldb_topic_01):

% Reached end of topic ksqldb_topic_01 [0] at offset 213
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 214
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 215
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 216
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 217
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 218
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
% Reached end of topic ksqldb_topic_01 [0] at offset 219
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 220
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
% Reached end of topic ksqldb_topic_01 [0] at offset 221
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
% Reached end of topic ksqldb_topic_01 [0] at offset 222

И я хочу сохранить в таблице последнее значение, которое вводит меня в топи c, для каждого города и датчика ID

В моей ksqldDB я создаю следующую таблицу:

CREATE TABLE ultimo_resgistro(city VARCHAR,sensorId VARCHAR,temperature INTEGER) WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='json',KEY = 'sensorId,city');
DESCRIBE EXTENDED ULTIMO_RESGISTRO;

Name                 : ULTIMO_RESGISTRO
Type                 : TABLE
Key field            : SENSORID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : ksqldb_topic_01 (partitions: 1, replication: 1)

 Field       | Type                      
-----------------------------------------
 ROWTIME     | BIGINT           (system) 
 ROWKEY      | VARCHAR(STRING)  (system) 
 CITY        | VARCHAR(STRING)           
 SENSORID    | VARCHAR(STRING)           
 TEMPERATURE | INTEGER                   
-----------------------------------------

Видя, что данные обрабатывают меня

select * from ultimo_resgistro emit changes;
+------------------+------------------+------------------+------------------+------------------+
|ROWTIME           |ROWKEY            |CITY              |SENSORID          |TEMPERATURE       |
+------------------+------------------+------------------+------------------+------------------+
key cannot be null
Query terminated

1 Ответ

1 голос
/ 24 января 2020

Проблема в том, что вам нужно правильно установить ключ сообщения Kafka. Вы также не можете указать два поля в предложении KEY. Подробнее об этом здесь

Вот пример того, как это сделать.

Сначала загрузите тестовые данные:

kafkacat -b kafka-1:39092 -P -t ksqldb_topic_01 <<EOF
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
EOF

Теперь в ksqlDB объявите схему над topi c - как поток, потому что нам нужно перераспределить данные, чтобы добавить ключ , Если вы управляете продюсером topi c, то, возможно, вы сделаете это в восходящем направлении и сохраните шаг.

CREATE STREAM sensor_data_raw (city VARCHAR, temperature DOUBLE, sensorId VARCHAR) 
    WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='JSON');

Повторное разделение данных на основе составного ключа.

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM sensor_data_repartitioned WITH (VALUE_FORMAT='AVRO') AS
    SELECT *
      FROM sensor_data_raw
    PARTITION BY city+sensorId;

Две вещи, на которые стоит обратить внимание:

  1. Я пользуюсь возможностью для повторной сериализации в Avro - если вы предпочитаете сохранять JSON, просто опустите WITH (VALUE_FORMAT пункт.
  2. Когда данные перераспределяются, гарантии упорядочения теряются, поэтому теоретически после этого вы можете получить неупорядоченные события.

В этот момент мы можем проверить преобразованные c:

ksql> PRINT SENSOR_DATA_REPARTITIONED FROM BEGINNING LIMIT 5;
Format:AVRO
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 5.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 10.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 15.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor03, {"CITY": "Sevilla", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}

Обратите внимание, что ключ в сообщении Kafka (второе поле после метки времени) теперь установлен правильно, по сравнению с исходными данными, которые не имели ключа:

ksql> PRINT ksqldb_topic_01 FROM BEGINNING LIMIT 5;
Format:JSON
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":20,"sensorId":"sensor03"}

Теперь мы можем объявить таблицу поверх перераспределенных данных. Поскольку сейчас я использую Avro, мне не нужно повторно вводить схему. Если бы я использовал JSON, мне нужно было бы ввести его снова как часть этого DDL.

CREATE TABLE ultimo_resgistro WITH (KAFKA_TOPIC='SENSOR_DATA_REPARTITIONED', VALUE_FORMAT='AVRO');

Ключ таблицы неявно берется из ROWKEY, который является ключом сообщения Kafka.

ksql> SELECT ROWKEY, CITY, SENSORID, TEMPERATURE FROM ULTIMO_RESGISTRO EMIT CHANGES;
+------------------+----------+----------+-------------+
|ROWKEY            |CITY      |SENSORID  |TEMPERATURE  |
+------------------+----------+----------+-------------+
|Madridsensor03    |Madrid    |sensor03  |5.0          |
|Sevillasensor03   |Sevilla   |sensor03  |20.0         |
|Sevillasensor01   |Sevilla   |sensor01  |5.0          |
|Sevillasensor02   |Sevilla   |sensor02  |5.0          |
|Valenciasensor02  |Valencia  |sensor02  |15.0         |
|Valenciasensor03  |Valencia  |sensor03  |15.0         |

Если вы хотите воспользоваться опрашивающими запросами (чтобы получить последнее значение), вам нужно go и повысить голосование (или добавить PR ?) эту проблему .

...