Обычно я сопоставляю данные таблицы, чтобы получить нужный мне ключ соединения. Недавно у нас был похожий случай, когда мы должны были объединить поток с соответствующими данными в KTable. В нашем случае ключ потока был первой частью ключа таблицы, поэтому мы могли группировать по этой первой части ключа и объединять результаты в виде списка. В конце это выглядело примерно так.
final KTable<String, ArrayList<String>> theTable = builder
.table(TABLE_TOPIC, Consumed.with(keySerde, Serdes.String()))
.groupBy((k, v) -> new KeyValue<>(k.getFirstKeyPart(), v))
.aggregate(
ArrayList::new,
(key, value, list) -> {
list.add(value);
return list;
},
(key, value, list) -> {
list.remove(value);
return list;
},
Materialized.with(Serdes.String(), stringListSerde));
final KStream<String, String> theStream = builder.stream(STREAM_TOPIC);
theStream
.join(theTable, (streamEvent, tableEventList) -> tableEventList)
.flatMapValues(value -> value)
.map(this::doStuff)
.to(TARGET_TOPIC);
Я не уверен, возможно ли это для вас, то есть, возможно, вы можете каким-то образом сопоставить данные таблицы с соединением.
I Я знаю, что это не совсем относится к вашему делу, но я надеюсь, что это все равно поможет. Может быть, вы можете немного уточнить, как будет выглядеть соответствие для вашего случая.