Да, вы можете сделать это с KSQL. Вот рабочий пример. Я использую этот файл docker-compose здесь для моей тестовой среды, если вы хотите воспроизвести приведенный ниже пример.
Во-первых, я заполняю некоторые тестовые данные на основе предоставленного вами образца. Я составил временные метки на основе текущей эпохи, +2 и +10 секунд:
Данные испытаний датчика:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sensor -P <<EOF
{"sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
EOF
данные испытаний индикатора:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t indicator -P << EOF
{"informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}
EOF
Теперь я запускаю KSQL CLI:
docker run --network cos_default --interactive --tty --rm \
confluentinc/cp-ksql-cli:5.0.0 \
http://ksql-server:8088
В KSQL мы можем изучить исходные данные в теме:
KSQL> PRINT 'sensor' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
KSQL> PRINT 'indicator' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}
Теперь мы регистрируем тему для использования в KSQL и объявляем схему:
ksql> CREATE STREAM SENSOR (SENSORPATH VARCHAR, VALUE DOUBLE, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='sensor',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM INDICATOR (INFORMATIONPATH VARCHAR, WORKPIECEID VARCHAR, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='indicator',TIMESTAMP='timestamp');
Message
----------------
Stream created
----------------
Мы можем запросить созданные потоки KSQL:
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , sensorpath, value FROM sensor;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-1 | 7.0
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-2 | 2.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-1 | 6.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-2 | 1.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-1 | 10.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-2 | 12.0
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , informationPath, WorkpieceID FROM indicator;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/informationString | 0020181
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/informationString | 0020182
Обратите внимание, что ROWTIME
STREAM отличается от ROWTIME
на выходе PRINT
. Это связано с тем, что вывод PRINT
показывает временную метку сообщения Kafka, тогда как в STREAM мы переопределили временную метку в предложении WITH
, чтобы вместо этого использовать столбец timestamp
из самой полезной нагрузки сообщения.
Чтобы объединить две темы, мы собираемся сделать две вещи:
- Создайте искусственный ключ для присоединения к ним, поскольку в настоящее время в данных нет ни одного Мы также применим этот новый столбец в качестве ключа сообщения Kafka (которое необходимо для объединения).
- Смоделируйте поток событий «индикатор» как таблицу KSQL . Это позволяет нам запросить текущее состояние значения
WorkpieceID
на основе отметки времени
Чтобы добавить искусственный ключ объединения, просто выберите константу и введите ее псевдоним с предложением AS
и используйте его в качестве ключа сообщения с PARTITION BY
:
ksql> CREATE STREAM SENSOR_KEYED AS SELECT sensorPath, value, 'X' AS JOIN_KEY FROM sensor PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
Для интереса мы можем проверить созданную тему Кафки
ksql> PRINT SENSOR_KEYED FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":7.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":2.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":6.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":1.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":10.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":12.0,"JOIN_KEY":"X"}
Обратите внимание, что ROWKEY теперь является JOIN_KEY, а не NULL, как указано выше в выводе PRINT 'sensor'
. Если вы пропустите PARTITION BY
, тогда будет добавлен JOIN_KEY, но сообщения останутся без ключа, а это не то, что мы хотим, чтобы объединение работало.
Теперь мы также повторно вводим данные индикатора:
ksql> CREATE STREAM INDICATOR_KEYED AS SELECT informationPath, WorkpieceID, 'X' as JOIN_KEY FROM indicator PARTITION BY JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
ksql> PRINT 'INDICATOR_KEYED' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020181","JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020182","JOIN_KEY":"X"}
Перепечатав данные индикатора, мы можем теперь зарегистрировать его как таблицу KSQL. В таблице KSQL возвращает состояние ключа вместо каждого события . Мы используем этот подход, чтобы определить WorkpieceID
, который будет связан с показаниями датчика, основываясь на отметке времени.
ksql> CREATE TABLE INDICATOR_STATE (JOIN_KEY VARCHAR, informationPath varchar, WorkpieceID varchar) with (value_format='json',kafka_topic='INDICATOR_KEYED',KEY='JOIN_KEY');
Message
---------------
Table created
---------------
Запрос к таблице показывает одно значение, которое является текущим состоянием:
ksql> SELECT * FROM INDICATOR_STATE;
1541623771000 | X | X | simulation/informationString | 0020182
Если в этот момент вы отправите еще одно сообщение в тему indicator
, состояние таблицы обновится, и вы увидите новую строку, выданную из SELECT
.
Наконец, мы можем выполнить объединение потоковой таблицы, сохраненное в новой теме:
ksql> CREATE STREAM SENSOR_ENRICHED AS SELECT S.SENSORPATH, TIMESTAMPTOSTRING(S.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SENSOR_TIMESTAMP, S.VALUE, I.WORKPIECEID FROM SENSOR_KEYED S LEFT JOIN INDICATOR_STATE I ON S.JOIN_KEY=I.JOIN_KEY;
Message
----------------------------
Stream created and running
----------------------------
Изучите новый поток:
ksql> DESCRIBE SENSOR_ENRICHED;
Name : SENSOR_ENRICHED
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SENSORPATH | VARCHAR(STRING)
SENSOR_TIMESTAMP | VARCHAR(STRING)
VALUE | DOUBLE
WORKPIECEID | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Запрос нового потока:
ksql> SELECT SENSORPATH, SENSOR_TIMESTAMP, VALUE, WORKPIECEID FROM SENSOR_ENRICHED;
simulation/machine/plc/sensor-1 | 2018-11-07 20:39:31 +0000 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:39:31 +0000 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:40:31 +0000 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:40:31 +0000 | 1.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:49:31 +0000 | 10.0 | 0020182
simulation/machine/plc/sensor-2 | 2018-11-07 20:49:31 +0000 | 12.0 | 0020182
Поскольку это KSQL, поток SENSOR_ENRICHED
(и основная тема с одним и тем же именем) будет постоянно заполняться событиями, приходящими на тему sensor
и отражающими любые изменения состояния на основе событий, отправленных на indicator
тема.