У меня есть требование создать приложение потоков Kafka, которое может обслуживать 3 требования, а именно: фильтровать, преобразовывать и обрабатывать.У меня есть код, который необходимо изменить соответствующим образом.Специфика того, что необходимо сделать в потоке Kafka, определяется конфигурацией json (требует ли она фильтрации, фильтрации + преобразования или фильтрации + преобразования + процесса).Все это встроено в рабочую единицу, и позже эти рабочие единицы используются для создания потоков Kafka.
Существует метод apply (), который применяет workUnits к KSteams
public KStream<Long, String> apply(KStream<Long, String> stream) {
if (isTransformer()) {
return stream.transform(ts);
}
else if (isProcessor()) {
return stream.process(ps, stateStoreNames);(ts);
}
else {
return stream.filter((Long key, String input) -> jsonMatcher.doesMatch(input));
}
}
Я хочу вернуть stream.process, как в двух других случаях, то есть stream.transform и stream.filter, но это не позволит мне сделать это, так как stream.process имеет stream.process (processorSupplier, stateStoreNames) stateStoreName в качестве параметра.Я не создаю никаких хранилищ состояний, как можно реализовать только метод process () с моим собственным классом Process, предоставляемым ProcessorSupplier?
Спасибо