Двойное выравнивание массивов. Ksqldb 0.8.1 - PullRequest
0 голосов
/ 31 марта 2020

ДАННЫЕ.

kafkacat -b 127.0.0.1 -t group-topic -P
{"groups":[{"name":"Roberth","surname":"Smith","origin":"England","albums":["Wish","Desintegration"],"group":"The Cure"},{"name":"Peter","surname":"Murphy","origin":"England","albums":["Mask","In The Flat Field"],"group":"Bauhaus"}]};

// СТРУКТУРА ПОТОКА


SET 'auto.offset.reset' = 'earliest';


CREATE STREAM GROUPS_01 
(groups ARRAY<STRUCT<
            albums ARRAY<VARCHAR>,
            name VARCHAR,
            surname VARCHAR
            >>) 
WITH (kafka_topic='group-topic', value_format='JSON');
SELECT 
EXPLODE(groups)->name AS name,
EXPLODE(groups)->surname AS surname,
EXPLODE(groups)->albums AS albums
FROM GROUPS_01 
EMIT CHANGES;

// У меня

NAME       SURNAME     ALBUMS
Roberth    Smith       [Wish,Desintegration]
Peter      Murphy      [Mask,In The Flat Field]

// Мне нужно

NAME       SURNAME     ALBUM
Roberth    Smith       Wish
Roberth    Smith       Desintegration
Peter      Murphy      Mask
Peter      Murphy      In The Flat Field

// TRY

EXPLODE(groups)->EXPLODE(albums)->album AS album

EXPLODE(albums)->album AS album

1 Ответ

1 голос
/ 31 марта 2020

Для ясности, вот исходные данные, которые вы предоставили:

{
    "groups": [
        {
            "name": "Roberth",
            "surname": "Smith",
            "origin": "England",
            "albums": [
                "Wish",
                "Desintegration"
            ],
            "group": "The Cure"
        },
        {
            "name": "Peter",
            "surname": "Murphy",
            "origin": "England",
            "albums": [
                "Mask",
                "In The Flat Field"
            ],
            "group": "Bauhaus"
        }
    ]
}

Сначала взорвите массив root

ksql> CREATE STREAM EX1A AS SELECT EXPLODE(GROUPS) AS GROUP_SINGLE FROM GROUPS_01 EMIT CHANGES;

 Message
-----------------------------------
 Created query with ID CSAS_EX1A_5
-----------------------------------

Это даст нам:

ksql> SELECT * FROM EX1A EMIT CHANGES;
+----------------+-------+-----------------------------------------------------------+
|ROWTIME         |ROWKEY |GROUP_SINGLE                                               |
+----------------+-------+-----------------------------------------------------------+
|1585666857714   |null   |{ALBUMS=[Wish, Desintegration], NAME=Roberth, SURNAME=Smith|
|                |       |}                                                          |
|1585666857714   |null   |{ALBUMS=[Mask, In The Flat Field], NAME=Peter, SURNAME=Murp|
|                |       |hy}                                                        |

Теперь используйте оператор -> для доступа к вложенной структуре и разбейте массив ALBUMS:

CREATE STREAM ALBUMS_EXPLODED AS 
    SELECT GROUP_SINGLE->NAME AS NAME, 
           GROUP_SINGLE->SURNAME AS SURNAME, 
           EXPLODE(GROUP_SINGLE->ALBUMS) AS ALBUM 
      FROM EX1A 
      EMIT CHANGES;

ksql> SELECT NAME, SURNAME, ALBUM FROM ALBUMS_EXPLODED EMIT CHANGES;
+-------------------+----------------------+-------------------+
|NAME               |SURNAME               |ALBUM              |
+-------------------+----------------------+-------------------+
|Roberth            |Smith                 |Wish               |
|Roberth            |Smith                 |Desintegration     |
|Peter              |Murphy                |Mask               |
|Peter              |Murphy                |In The Flat Field  |
...