Проблема в том, что вам нужно правильно установить ключ сообщения Kafka. Вы также не можете указать два поля в предложении KEY
. Подробнее об этом здесь
Вот пример того, как это сделать.
Сначала загрузите тестовые данные:
kafkacat -b kafka-1:39092 -P -t ksqldb_topic_01 <<EOF
{"city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":20,"sensorId":"sensor03"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor01"}
{"city":"Sevilla","temperature":5,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor02"}
{"city":"Valencia","temperature":15,"sensorId":"sensor03"}
EOF
Теперь в ksqlDB объявите схему над topi c - как поток, потому что нам нужно перераспределить данные, чтобы добавить ключ , Если вы управляете продюсером topi c, то, возможно, вы сделаете это в восходящем направлении и сохраните шаг.
CREATE STREAM sensor_data_raw (city VARCHAR, temperature DOUBLE, sensorId VARCHAR)
WITH (KAFKA_TOPIC='ksqldb_topic_01', VALUE_FORMAT='JSON');
Повторное разделение данных на основе составного ключа.
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM sensor_data_repartitioned WITH (VALUE_FORMAT='AVRO') AS
SELECT *
FROM sensor_data_raw
PARTITION BY city+sensorId;
Две вещи, на которые стоит обратить внимание:
- Я пользуюсь возможностью для повторной сериализации в Avro - если вы предпочитаете сохранять JSON, просто опустите
WITH (VALUE_FORMAT
пункт. - Когда данные перераспределяются, гарантии упорядочения теряются, поэтому теоретически после этого вы можете получить неупорядоченные события.
В этот момент мы можем проверить преобразованные c:
ksql> PRINT SENSOR_DATA_REPARTITIONED FROM BEGINNING LIMIT 5;
Format:AVRO
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Madridsensor03, {"CITY": "Madrid", "TEMPERATURE": 5.0, "SENSORID": "sensor03"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 10.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor01, {"CITY": "Sevilla", "TEMPERATURE": 15.0, "SENSORID": "sensor01"}
1/24/20 9:55:54 AM UTC, Sevillasensor03, {"CITY": "Sevilla", "TEMPERATURE": 20.0, "SENSORID": "sensor03"}
Обратите внимание, что ключ в сообщении Kafka (второе поле после метки времени) теперь установлен правильно, по сравнению с исходными данными, которые не имели ключа:
ksql> PRINT ksqldb_topic_01 FROM BEGINNING LIMIT 5;
Format:JSON
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":20,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Madrid","temperature":5,"sensorId":"sensor03"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":10,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":15,"sensorId":"sensor01"}
{"ROWTIME":1579859380123,"ROWKEY":"null","city":"Sevilla","temperature":20,"sensorId":"sensor03"}
Теперь мы можем объявить таблицу поверх перераспределенных данных. Поскольку сейчас я использую Avro, мне не нужно повторно вводить схему. Если бы я использовал JSON, мне нужно было бы ввести его снова как часть этого DDL.
CREATE TABLE ultimo_resgistro WITH (KAFKA_TOPIC='SENSOR_DATA_REPARTITIONED', VALUE_FORMAT='AVRO');
Ключ таблицы неявно берется из ROWKEY
, который является ключом сообщения Kafka.
ksql> SELECT ROWKEY, CITY, SENSORID, TEMPERATURE FROM ULTIMO_RESGISTRO EMIT CHANGES;
+------------------+----------+----------+-------------+
|ROWKEY |CITY |SENSORID |TEMPERATURE |
+------------------+----------+----------+-------------+
|Madridsensor03 |Madrid |sensor03 |5.0 |
|Sevillasensor03 |Sevilla |sensor03 |20.0 |
|Sevillasensor01 |Sevilla |sensor01 |5.0 |
|Sevillasensor02 |Sevilla |sensor02 |5.0 |
|Valenciasensor02 |Valencia |sensor02 |15.0 |
|Valenciasensor03 |Valencia |sensor03 |15.0 |
Если вы хотите воспользоваться опрашивающими запросами (чтобы получить последнее значение), вам нужно go и повысить голосование (или добавить PR ?) эту проблему .