kafka stream to ktable join - PullRequest
       45

kafka stream to ktable join

0 голосов
/ 29 октября 2018

Я пытаюсь присоединиться к

  • KStream: создан из темы, тема имеет значение JSON. Я повторно набираю поток, используя два приписывают из стоимости. пример значения (фрагмент json). Я создал собственный класс pojo и использую пользовательские serdes. {"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}

ключи отображаются как:

map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))

Я смотрю на KStream и печатаю оба ключа и атрибуты, которые я использовал. Выглядит все хорошо.

  • KTable: я создаю ktable из темы, пишу в тему, используя скрипт на python, и ключ для темы - KYZ1ifHCInOctets, комбинация имени устройства и имени индикатора (сверху). Я делаю toStream, а затем заглянуть в результирующий поток. Ключи и ценности все кажется хорошо.

Теперь, когда я делаю внутреннее соединение и просматриваю или просматриваю тему, я вижу, что ключ и значения не совпадают. Регистрация не работает,

  KStream<String, MyPojoClass> joined= datastream.join(table, 
          (data,table)->data
          ,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
          );

key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}

У меня точно так же работает через ksql, но я хотел создать свое потоковое приложение.

1 Ответ

0 голосов
/ 31 октября 2018

Теперь это звучит настолько глупо, что ошибка была в моем классе PoJo, который имел несколько статических атрибутов :-(, что приводило к неправильным ключам.

...