Поток Кафки без StateStore - PullRequest
       2

Поток Кафки без StateStore

0 голосов
/ 19 октября 2018

У меня есть требование создать приложение потоков Kafka, которое может обслуживать 3 требования, а именно: фильтровать, преобразовывать и обрабатывать.У меня есть код, который необходимо изменить соответствующим образом.Специфика того, что необходимо сделать в потоке Kafka, определяется конфигурацией json (требует ли она фильтрации, фильтрации + преобразования или фильтрации + преобразования + процесса).Все это встроено в рабочую единицу, и позже эти рабочие единицы используются для создания потоков Kafka.

Существует метод apply (), который применяет workUnits к KSteams

public KStream<Long, String> apply(KStream<Long, String> stream) {
    if (isTransformer()) {
        return stream.transform(ts);
    }
    else if (isProcessor()) {
        return stream.process(ps, stateStoreNames);(ts);
    }
    else {
        return stream.filter((Long key, String input) -> jsonMatcher.doesMatch(input));
    }
}

Я хочу вернуть stream.process, как в двух других случаях, то есть stream.transform и stream.filter, но это не позволит мне сделать это, так как stream.process имеет stream.process (processorSupplier, stateStoreNames) stateStoreName в качестве параметра.Я не создаю никаких хранилищ состояний, как можно реализовать только метод process () с моим собственным классом Process, предоставляемым ProcessorSupplier?

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...