Решение № 2, похоже, работает, но оно генерирует предупреждения, потому что интерактивные запросы требуют некоторого времени для подготовки - поэтому я реализовал то же решение с Transformer:
@Slf4j
@Configuration
@RequiredArgsConstructor
@SuppressWarnings("unchecked")
public class LogBestObservationsSelectorProcessorConfig {
private String logScoredObservationsStore = "log-scored-observations-store";
private final Serde<LogEntryRelevantObservationIdTuple> logEntryRelevantObservationIdTupleSerde;
private final Serde<LogRelevantObservationIdsTuple> logRelevantObservationIdsTupleSerde;
private final Serde<LogEntryObservationMatchTuple> logEntryObservationMatchTupleSerde;
private final Serde<LogEntryObservationMatchIdsRelevantObservationsTuple> logEntryObservationMatchIdsRelevantObservationsTupleSerde;
@Bean
public Function<
GlobalKTable<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple>,
Function<
KStream<LogEntryRelevantObservationIdTuple, LogEntryRelevantObservationIdTuple>,
Function<
KTable<String, LogRelevantObservationIds>,
KStream<String, LogEntryObservationMatchTuple>
>
>
>
logBestObservationSelectorProcessor() {
return (GlobalKTable<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple> logScoredObservationsTable) ->
(KStream<LogEntryRelevantObservationIdTuple, LogEntryRelevantObservationIdTuple> logScoredObservationProcessedStream) ->
(KTable<String, LogRelevantObservationIdsTuple> logRelevantObservationIdsTable) -> {
return logScoredObservationProcessedStream
.selectKey((k, v) -> k.getLogId())
.leftJoin(
logRelevantObservationIdsTable,
LogEntryObservationMatchIdsRelevantObservationsTuple::new,
Joined.with(
Serdes.String(),
logEntryRelevantObservationIdTupleSerde,
logRelevantObservationIdsTupleSerde
)
)
.transform(() -> new LogEntryObservationMatchTransformer(logScoredObservationsStore))
.groupByKey(
Grouped.with(
Serdes.String(),
logEntryObservationMatchTupleSerde
)
)
.reduce(
(match1, match2) -> Double.compare(match1.getScore(), match2.getScore()) != -1 ? match1 : match2,
Materialized.with(
Serdes.String(),
logEntryObservationMatchTupleSerde
)
)
.toStream()
;
};
}
@RequiredArgsConstructor
private static class LogEntryObservationMatchTransformer implements Transformer<String, LogEntryObservationMatchIdsRelevantObservationsTuple, KeyValue<String, LogEntryObservationMatchTuple>> {
private final String stateStoreName;
private ProcessorContext context;
private TimestampedKeyValueStore<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple> kvStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.kvStore = (TimestampedKeyValueStore<LogEntryRelevantObservationIdTuple, LogEntryObservationMatchTuple>) context.getStateStore(stateStoreName);
}
@Override
public KeyValue<String, LogEntryObservationMatchTuple> transform(String logId, LogEntryObservationMatchIdsRelevantObservationsTuple value) {
val observationIds = value.getLogEntryRelevantObservationsTuple().getRelevantObservations().getObservationIds();
val allObservationsProcessed = observationIds.stream()
.allMatch((observationId) -> {
val key = LogEntryRelevantObservationIdTuple.newBuilder()
.setLogId(logId)
.setRelevantObservationId(observationId)
.build();
return kvStore.get(key) != null;
});
if (!allObservationsProcessed) {
return null;
}
val observationId = value.getLogEntryRelevantObservationIdTuple().getObservationId();
val key = LogEntryRelevantObservationIdTuple.newBuilder()
.setLogId(logId)
.setRelevantObservationId(observationId)
.build();
ValueAndTimestamp<LogEntryObservationMatchTuple> observationMatchValueAndTimestamp = kvStore.get(key);
return new KeyValue<>(logId, observationMatchValueAndTimestamp.value());
}
@Override
public void close() {
}
}
}