Процессорный узел в потоках кафки - PullRequest
0 голосов
/ 08 мая 2018

Я работаю на processor node в потоках кафки. Для простого кода я написал, как показано ниже, только для фильтрации UserID, это правильный способ выполнения processor node в потоках kafka?

Но приведенный ниже код не компилируется, выдает ошибку с: The method filter(Predicate<? super Object,? super Object>) in the type KStream<Object,Object> is not applicable for the arguments (new Predicate<String,String>(){})

KStreamBuilder builder = new KStreamBuilder();

builder.stream(topic)
    .filter(new Predicate <String, String>() {
        //@Override
        public boolean test(String key, String value) {
            Hashtable<Object, Object> message;
            // put you processor logic here
            return message.get("UserID").equals("1");
        }
    })
    .to(streamouttopic);

    final KafkaStreams streams = new KafkaStreams(builder, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
       streams.close();
       latch.countDown();
    }
});

try {
    streams.start();
    latch.await();
} catch (Throwable e) {
    System.exit(1);
}
System.exit(0);

Может, кто-нибудь направит меня, пожалуйста?

Ответы [ 2 ]

0 голосов
/ 08 мая 2018

builder.stream(topic) возвращает KStream<Object,Object> тип, потому что вы не указываете универсальные типы. И <Object,Object> не совместим с <String,String>.

Если вы знаете, что фактический тип - KStream<String,String>, вы можете указать тип следующим образом:

builder.<Sting,String>stream(topic)
       .filter(...)

Чтобы ответить на ваш вопрос о «узлах процессора»: да, добавление filter() добавит узел процессора внутри. Обратите внимание, что на уровне DSL обычно не нужно думать о процессорах.

Если вы хотите использовать процессоры явно, вы можете использовать Processor API вместо DSL. Посмотрите пример WordCount: https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

Обратите внимание, что при использовании DSL внутренне код будет преобразован в топологию процессора, которая является моделью времени выполнения Kafka Streams.

0 голосов
/ 08 мая 2018

Возможно, вы используете Predicate класс из другого пакета. Вам нужно использовать

import org.apache.kafka.streams.kstream.Predicate;
...