Я пытаюсь присоединиться к
- KStream: создан из темы, тема имеет значение JSON. Я повторно набираю поток, используя
два приписывают из стоимости. пример значения (фрагмент json). Я создал собственный класс pojo и использую пользовательские serdes.
{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}
ключи отображаются как:
map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))
Я смотрю на KStream и печатаю оба ключа
и атрибуты, которые я использовал.
Выглядит все хорошо.
- KTable: я создаю ktable из темы, пишу в тему, используя скрипт на python, и ключ для темы -
KYZ1ifHCInOctets
, комбинация имени устройства и имени индикатора (сверху).
Я делаю
toStream, а затем заглянуть в результирующий поток. Ключи и ценности все кажется
хорошо.
Теперь, когда я делаю внутреннее соединение и просматриваю или просматриваю тему, я вижу, что ключ и значения не совпадают. Регистрация не работает,
KStream<String, MyPojoClass> joined= datastream.join(table,
(data,table)->data
,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
);
key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}
У меня точно так же работает через ksql, но я хотел создать свое потоковое приложение.