Получить один поток ksql со значениями из двух потоков ksql - PullRequest
1 голос
/ 08 мая 2019

Я создаю один главный поток со значением в виде массива.Я хотел бы получить поля массива в одно и направить его в поток ne ksql.

Сначала я создаю основной поток:

CREATE STREAM runtime_master_stream
     (timestamp BIGINT,
      opcuaObject VARCHAR,
      value array<DOUBLE>)
   WITH (KAFKA_TOPIC='runtime_master', VALUE_FORMAT='JSON');

Вывод:

{"ROWTIME":1557317077577,"ROWKEY":"\u0000\u0000\u0001j�T�I","timestamp":1557317069589,"opcuaObject":"DatBetrZ.BetrZStdOM","value":[19.737154,512.0,320.18024,423.87027,399.99384,292.1198,450.821]}

Затем я создаю новый поток, чтобы получить поля массива в одном:

CREATE STREAM runtime_std_om_all_knife_stream
 WITH (TIMESTAMP='timestamp',
        PARTITIONS=4,
        VALUE_FORMAT='JSON') AS
 SELECT
        timestamp,
        opcuaObject,
        value[0] AS knife1,
    value[1] AS knife02,
    value[2] AS knife03,
    value[3] AS knife04,
    value[4] AS knife05,
        value[5] AS knife05,
    value[6] AS knife06
 FROM RUNTIME_MASTER_STREAM
 WHERE opcuaObject ='DatBetrZ.BetrZStdOM';

Вывод:

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"DatBetrZ.BetrZStdOM","KNIFE1":19.737154,"KNIFE02":512.0,"KNIFE03":320.18024,"KNIFE04":42
3.87027,"KNIFE05":400.02216,"KNIFE06":292.1198,}

Что мне нужно, это вывод в одном потоке ksql ПРИМЕР:

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife1","VALUE":19.737154}

{"ROWTIME":1557317170313,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife02","VALUE":19.737154}

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife03","VALUE":19.737154}

{"ROWTIME":1557317170313,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife04","VALUE":19.737154}

{"ROWTIME":1557317170312,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife05","VALUE":19.737154}

{"ROWTIME":1557317170313,"ROWKEY":"\u0000\u0000\u0001j�V4�","TIMESTAMP":1557317162337,"OPCUAOBJECT":"Knife06","VALUE":39.737154}

Есть идеи, как получить этот вывод одним потоком в kafka ksql?

1 Ответ

1 голос
/ 09 мая 2019

Вам нужна функция EXPLODE / UNNEST, которая в настоящее время недоступна, но для которой существует существующая проблема , которую вы можете отозвать, если считаете, что это будет полезно,

Обходным решением для вас может быть что-то вроде этого, когда вы грубо-принудительно заполняете целевой поток с помощью INSERT INTO, перебирая все возможные индексы вашего массива:

Заполняйте некоторые тестовые данныев тему Kafka

$ curl "https://api.mockaroo.com/api/440970e0?count=5&key=ff7856d0" | \
    kafkacat -P -b localhost -t car_data_01

Проверьте данные в KSQL:

ksql> PRINT 'car_data_01' FROM BEGINNING;
Format:JSON
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1533200557","car":"Oldsmobile","value":[68.93,53.58]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1548442477","car":"Mercury","value":[60.09,69.07,63.77,63.13]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1544928225","car":"Volkswagen","value":[59.77,6.94,97.7,30.86,16.9]}
{"ROWTIME":1557392065409,"ROWKEY":"null","timestamp":"1545383393","car":"Nissan","value":[13.32]}
{"ROWTIME":1557392065412,"ROWKEY":"null","timestamp":"1552825010","car":"Hyundai","value":[12.92]}

Создайте поток данных:

CREATE STREAM CAR_DATA (timestamp BIGINT, CAR VARCHAR, VALUE ARRAY<DOUBLE>) WITH (KAFKA_TOPIC='car_data_01', VALUE_FORMAT='JSON');
ksql> SELECT TIMESTAMP, CAR, VALUE[0], VALUE[1], VALUE[2] FROM CAR_DATA;
1533200557 | Oldsmobile | 68.93 | 53.58 | null
1548442477 | Mercury | 60.09 | 69.07 | 63.77
1544928225 | Volkswagen | 59.77 | 6.94 | 97.7
1545383393 | Nissan | 13.32 | null | null
1552825010 | Hyundai | 12.92 | null | null

Создайте выводпоток, для начала просто содержащий элементы нулевого индекса массива:

CREATE STREAM CAR_DATA_EXPLODED AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA WHERE VALUE[0] IS NOT NULL;
ksql> SELECT * FROM CAR_DATA_EXPLODED;
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09

Вставьте остальные индексы массива в новый поток:

CREATE STREAM CAR_DATA_EXPLODED_00 AS SELECT TIMESTAMP, CAR, 'Sensor 00' AS SOURCE, VALUE[0] AS VALUE FROM CAR_DATA
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 01' AS SOURCE, VALUE[1] AS VALUE FROM CAR_DATA WHERE  VALUE[1] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 02' AS SOURCE, VALUE[2] AS VALUE FROM CAR_DATA WHERE  VALUE[2] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 03' AS SOURCE, VALUE[3] AS VALUE FROM CAR_DATA WHERE  VALUE[3] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 04' AS SOURCE, VALUE[4] AS VALUE FROM CAR_DATA WHERE  VALUE[4] IS NOT NULL;
INSERT INTO CAR_DATA_EXPLODED_00 SELECT TIMESTAMP, CAR, 'Sensor 05' AS SOURCE, VALUE[5] AS VALUE FROM CAR_DATA WHERE  VALUE[5] IS NOT NULL;

Изучите разнесенныйданные:

ksql> SELECT * FROM CAR_DATA_EXPLODED_00;
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 00 | 68.93
1557392065409 | null | 1545383393 | Nissan | Sensor 00 | 13.32
1557392065409 | null | 1544928225 | Volkswagen | Sensor 00 | 59.77
1557392065409 | null | 1548442477 | Mercury | Sensor 02 | 63.77
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1544928225 | Volkswagen | Sensor 04 | 16.9
1557392065409 | null | 1533200557 | Oldsmobile | Sensor 01 | 53.58
1557392065409 | null | 1544928225 | Volkswagen | Sensor 02 | 97.7
1557392065412 | null | 1552825010 | Hyundai | Sensor 00 | 12.92
1557392065409 | null | 1548442477 | Mercury | Sensor 01 | 69.07
1557392065409 | null | 1548442477 | Mercury | Sensor 00 | 60.09
1557392065409 | null | 1544928225 | Volkswagen | Sensor 01 | 6.94
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13
ksql> SELECT * FROM CAR_DATA_EXPLODED_00 WHERE SOURCE='Sensor 03';
1557392065409 | null | 1544928225 | Volkswagen | Sensor 03 | 30.86
1557392065409 | null | 1548442477 | Mercury | Sensor 03 | 63.13
...