Поток KSQL из темы с разнородными структурами JSON - PullRequest
0 голосов
/ 19 декабря 2018

Есть ли способ создать поток из темы, определяющей, что вся запись должна рассматриваться как VARCHAR, чтобы я мог создавать потоки из нее с помощью extractjsonfield ()?Примеры записей могут выглядеть примерно так:

{
  "Header": {
    "RecType": "RecA",
    ... more header records in a fairly consistent format ...
  },
  "RAFld1": {
    "someFld": "some data",
    "someOtherField": 1.001,
  },
  "RAFld2": {
    "aFld": "data",
    "anotherFld": 98.6,
    ...
  },
  ...
}

Но следующая запись может выглядеть следующим образом:

{
  "Header": {
    "RecType": "RecB",
    ... more header records in a fairly consistent format ...
  },
  "RBFld1": {
    "randomFld": "random data",
    "randomOtherField": 1.001,
    ...
  }
}

Я могу решить, как определить исходный поток с известными полями типа VARCHAR изатем extractjsonfield () (с соответствующим предложением where), но не вижу способа сказать, что структура верхнего уровня не имеет последовательно именованных полей.

Это способ форматирования моей входной темы;Я не могу изменить этот формат.Я надеялся, что KSQL станет изящным решением, но, похоже, я застрял с самого начала, потому что не смог справиться с этой динамической структурой.

1 Ответ

0 голосов
/ 19 декабря 2018

Неважно, если вы называете поля в вашей схеме, которые не присутствуют в каждом сообщении;вы просто получите null значений.

Я подумал, что ваш вопрос интересный, и написал объяснение того, как KSQL может работать здесь - дайте мне знать, если вы хотите что-то еще с этим сделать, и я могу расширить ответ.


  1. Проверка необработанных данных:

    ksql> PRINT 'source_data' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
    {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
    
  2. Зарегистрируйте тему source_data для использования в качестве потока KSQL с именем my_stream:

    CREATE STREAM my_stream (Header VARCHAR, \
                             RAFld1 VARCHAR, \
                             RAFld2 VARCHAR, \
                             RBFld1 VARCHAR) \
    WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
    
  3. Проверьте сообщения.Обратите внимание, что во втором сообщении (тип записи «B») значение «RAFld1» отсутствует, и поэтому отображается null:

    ksql> SELECT Header, RAFld1 FROM my_stream LIMIT 2;
    {"RecType":"RecA"} | {"someOtherField":1.001,"someFld":"some data"}
    {"RecType":"RecB"} | null
    
  4. Заполните новыйТема Kafka с только значениями типа «A» записи, использующими EXTRACTFROMJSON для фильтрации типов записей по значению заголовка и извлечения именованных полей из полезной нагрузки:

    CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS \
    SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, \
            EXTRACTJSONFIELD(RAFld1,'$.someFld')        AS someFld, \
            EXTRACTJSONFIELD(RAFld2,'$.aFld')           AS aFld, \
            EXTRACTJSONFIELD(RAFld2,'$.anotherFld')     AS anotherFld \
            FROM my_stream \
    WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
    

    Обратите внимание, что сериализация переключается наAvro, чтобы схема была автоматически доступна любому потребителю, без необходимости вручную ее объявлять.

  5. Обратите внимание, что новый поток имеет схему и постоянно заполняется сообщениями по мере их поступления в исходную тему source_data:

    ksql> DESCRIBE recA_data;
    
    Name                 : RECA_DATA
    Field          | Type
    --------------------------------------------
    ROWTIME        | BIGINT           (system)
    ROWKEY         | VARCHAR(STRING)  (system)
    SOMEOTHERFIELD | VARCHAR(STRING)
    SOMEFLD        | VARCHAR(STRING)
    AFLD           | VARCHAR(STRING)
    ANOTHERFLD     | VARCHAR(STRING)
    --------------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    
    ksql> SELECT * FROM recA_data;
    1545240188787 | null | 1.001 | some data | data | 98.6
    
...