KSQL Как объединить тему данных датчика с темой индикатора без общего идентификатора - PullRequest
0 голосов
/ 06 ноября 2018

У меня есть поток данных в мои темы с сервера датчиков, который я не могу контролировать.

В теме A имеется несколько полезных данных, поступающих с датчиков (a, b, c, d ...).

В теме B появляются входящие сообщения-индикаторы (например, 1,2, ..), сообщающие, что теперь поступающие данные датчика из темы A принадлежат новому объекту x вместо x-1

Я хочу объединить данные из Темы A, соответствующие текущему на данный момент объекту из Темы B.

Я довольно плохо знаком с KSQL и потоковой логикой, поэтому я не знаю, возможно ли это. Такое ощущение, что может быть довольно простое решение, но я не нашел ничего подобного в примерах.

РЕДАКТИРОВАТЬ:

данные датчика (тема A) могут выглядеть следующим образом:

sensorPath                        timestamp  value
simulation/machine/plc/sensor-1 | 1 |        7.0
simulation/machine/plc/sensor-2 | 1 |        2.0
simulation/machine/plc/sensor-1 | 2 |        6.0
simulation/machine/plc/sensor-2 | 2 |        1.0
...
simulation/machine/plc/sensor-1 | 10 |       10.0
simulation/machine/plc/sensor-2 | 10 |       12.0

данные индикатора (тема B) могут выглядеть следующим образом

informationPath                timestamp   WorkpieceID
simulation/informationString | 1  |        0020181
simulation/informationString | 10 |        0020182

Я хочу сопоставить данные датчика с соответствующей деталью в новой теме / потоке. Новые поступающие данные датчика всегда принадлежат новейшей информационной строке / заготовке.

Итак, тема C должна выглядеть так:

sensorPath                        SensorTimestamp  value WorkpieceID
simulation/machine/plc/sensor-1 | 1 |              7.0 | 0020181
simulation/machine/plc/sensor-2 | 1 |              2.0 | 0020181             
simulation/machine/plc/sensor-1 | 2 |              6.0 | 0020181
simulation/machine/plc/sensor-2 | 2 |              1.0 | 0020181
...
simulation/machine/plc/sensor-1 | 10 |             10.0| 0020182
simulation/machine/plc/sensor-2 | 10 |             12.0| 0020182

Так что мне нужно что-то вроде соединения по topicA.timestamp> = current (topicB.timestamp)?!

1 Ответ

0 голосов
/ 08 ноября 2018

Да, вы можете сделать это с KSQL. Вот рабочий пример. Я использую этот файл docker-compose здесь для моей тестовой среды, если вы хотите воспроизвести приведенный ниже пример.

Во-первых, я заполняю некоторые тестовые данные на основе предоставленного вами образца. Я составил временные метки на основе текущей эпохи, +2 и +10 секунд:

  • Данные испытаний датчика:

    docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sensor -P <<EOF
    {"sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
    {"sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
    {"sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
    {"sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
    {"sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
    {"sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
    EOF
    
  • данные испытаний индикатора:

    docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t indicator -P << EOF
    {"informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
    {"informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}
    EOF
    

Теперь я запускаю KSQL CLI:

docker run --network cos_default --interactive --tty --rm \
    confluentinc/cp-ksql-cli:5.0.0 \
    http://ksql-server:8088

В KSQL мы можем изучить исходные данные в теме:

KSQL> PRINT 'sensor' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}

KSQL> PRINT 'indicator' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}

Теперь мы регистрируем тему для использования в KSQL и объявляем схему:

ksql> CREATE STREAM SENSOR (SENSORPATH VARCHAR, VALUE DOUBLE, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='sensor',TIMESTAMP='timestamp');

Message
----------------
Stream created
----------------
ksql> CREATE STREAM INDICATOR (INFORMATIONPATH VARCHAR, WORKPIECEID VARCHAR, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='indicator',TIMESTAMP='timestamp');

Message
----------------
Stream created
----------------

Мы можем запросить созданные потоки KSQL:

ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , sensorpath, value FROM sensor;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-1 | 7.0
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-2 | 2.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-1 | 6.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-2 | 1.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-1 | 10.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-2 | 12.0

ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , informationPath, WorkpieceID FROM indicator;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/informationString | 0020181
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/informationString | 0020182

Обратите внимание, что ROWTIME STREAM отличается от ROWTIME на выходе PRINT. Это связано с тем, что вывод PRINT показывает временную метку сообщения Kafka, тогда как в STREAM мы переопределили временную метку в предложении WITH, чтобы вместо этого использовать столбец timestamp из самой полезной нагрузки сообщения.

Чтобы объединить две темы, мы собираемся сделать две вещи:

  1. Создайте искусственный ключ для присоединения к ним, поскольку в настоящее время в данных нет ни одного Мы также применим этот новый столбец в качестве ключа сообщения Kafka (которое необходимо для объединения).
  2. Смоделируйте поток событий «индикатор» как таблицу KSQL . Это позволяет нам запросить текущее состояние значения WorkpieceID на основе отметки времени

Чтобы добавить искусственный ключ объединения, просто выберите константу и введите ее псевдоним с предложением AS и используйте его в качестве ключа сообщения с PARTITION BY:

ksql> CREATE STREAM SENSOR_KEYED AS SELECT sensorPath, value, 'X' AS JOIN_KEY FROM sensor PARTITION BY JOIN_KEY;

Message
----------------------------
Stream created and running
----------------------------

Для интереса мы можем проверить созданную тему Кафки

ksql> PRINT SENSOR_KEYED FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":7.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":2.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":6.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":1.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":10.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":12.0,"JOIN_KEY":"X"}

Обратите внимание, что ROWKEY теперь является JOIN_KEY, а не NULL, как указано выше в выводе PRINT 'sensor'. Если вы пропустите PARTITION BY, тогда будет добавлен JOIN_KEY, но сообщения останутся без ключа, а это не то, что мы хотим, чтобы объединение работало.

Теперь мы также повторно вводим данные индикатора:

ksql> CREATE STREAM INDICATOR_KEYED AS SELECT informationPath, WorkpieceID, 'X' as JOIN_KEY FROM indicator PARTITION BY JOIN_KEY;

Message
----------------------------
Stream created and running
----------------------------
ksql> PRINT 'INDICATOR_KEYED' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020181","JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020182","JOIN_KEY":"X"}

Перепечатав данные индикатора, мы можем теперь зарегистрировать его как таблицу KSQL. В таблице KSQL возвращает состояние ключа вместо каждого события . Мы используем этот подход, чтобы определить WorkpieceID, который будет связан с показаниями датчика, основываясь на отметке времени.

ksql> CREATE TABLE INDICATOR_STATE (JOIN_KEY VARCHAR, informationPath varchar, WorkpieceID varchar) with (value_format='json',kafka_topic='INDICATOR_KEYED',KEY='JOIN_KEY');

Message
---------------
Table created
---------------

Запрос к таблице показывает одно значение, которое является текущим состоянием:

ksql> SELECT * FROM INDICATOR_STATE;
1541623771000 | X | X | simulation/informationString | 0020182

Если в этот момент вы отправите еще одно сообщение в тему indicator, состояние таблицы обновится, и вы увидите новую строку, выданную из SELECT.

Наконец, мы можем выполнить объединение потоковой таблицы, сохраненное в новой теме:

ksql> CREATE STREAM SENSOR_ENRICHED AS SELECT S.SENSORPATH, TIMESTAMPTOSTRING(S.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SENSOR_TIMESTAMP, S.VALUE, I.WORKPIECEID FROM SENSOR_KEYED S LEFT JOIN INDICATOR_STATE I ON S.JOIN_KEY=I.JOIN_KEY;

Message
----------------------------
Stream created and running
----------------------------

Изучите новый поток:

ksql> DESCRIBE SENSOR_ENRICHED;

Name                 : SENSOR_ENRICHED
Field            | Type
----------------------------------------------
ROWTIME          | BIGINT           (system)
ROWKEY           | VARCHAR(STRING)  (system)
SENSORPATH       | VARCHAR(STRING)
SENSOR_TIMESTAMP | VARCHAR(STRING)
VALUE            | DOUBLE
WORKPIECEID      | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Запрос нового потока:

ksql> SELECT SENSORPATH, SENSOR_TIMESTAMP, VALUE, WORKPIECEID FROM SENSOR_ENRICHED;
simulation/machine/plc/sensor-1 | 2018-11-07 20:39:31 +0000 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:39:31 +0000 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:40:31 +0000 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:40:31 +0000 | 1.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:49:31 +0000 | 10.0 | 0020182
simulation/machine/plc/sensor-2 | 2018-11-07 20:49:31 +0000 | 12.0 | 0020182

Поскольку это KSQL, поток SENSOR_ENRICHED (и основная тема с одним и тем же именем) будет постоянно заполняться событиями, приходящими на тему sensor и отражающими любые изменения состояния на основе событий, отправленных на indicator тема.

...