Соединение двух потоков Кафки - PullRequest
0 голосов
/ 24 мая 2018

пытается соединить два Kstream, но я получаю сообщение об ошибке несоответствия типов, ниже приведен фрагмент кода.

  KStream<String, String> longCounts = netExpence.join(netIncome, (key1, key2) -> key1 + "/" + key2,                    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),stringSerde, stringSerde, stringSerde);

Приходит ошибка: Несоответствие типов: невозможно преобразовать из String в R Это синтаксис для присоединения к kstreams join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows, Joined<K,V,VO> joined)

, пожалуйста, объясните, что именно ValueJoiner<? super V,? super VO,? extends VR> делает .. Спасибо

1 Ответ

0 голосов
/ 24 мая 2018

ValueJoiner вызывается с двумя значениями совпадающих записей и выдает значение результата объединения.

// key type must be the same for a join
// value type can be different
KStream<KeyType, ValueType1> stream1 = ...
KStream<KeyType, ValueType2> stream2 = ...

KStream<KeyType, OutputType> joined = stream1.join(stream2, ...);

Таким образом, ValueJoiner должно иметь ValueType1 в качестве первого универсального (? super V) и ValueType2 в качестве второго универсального (? super VO).Для третьего универсального (? extend VR) вы указываете тип вывода (например, OutputType из приведенного выше примера).

Обновление

Вам также необходимо настроитьправильный Serdes для времени выполнения.Если все типы одинаковы, лучше всего установить значения по умолчанию через StreamsConfig для ключа и / или значения соответственно.В противном случае вы можете перезаписать Serdes по умолчанию для каждого оператора:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...