Как увеличить количество потоков, обрабатывающих записи в топологии? - PullRequest
1 голос
/ 08 ноября 2019

У меня есть топология:

Topology builder = new Topology();
        builder.addSource("source",stringDeserializer,stringDeserializer, "TOPIC-DEV-ACH")
            .addProcessor("process1", ProcessorOne::new , "source")
            .addProcessor("process2", ProcessorTwo::new , "source")
            .addProcessor("process3", ProcessorThree::new , "source")

            .addSink("sink", "asink" ,stringSerializer, stringSerializer, "process1","process2","process3");

Если я войду в систему: Thread.currentThread().getName() в process(K var1, V var2) Результат:

processor1 97527H7-e45cfcd3-6fb7-4fa9-b6a1-b3f5ed122304-StreamThread-1
processor2 97527H7-e45cfcd3-6fb7-4fa9-b6a1-b3f5ed122304-StreamThread-1
processor3 97527H7-e45cfcd3-6fb7-4fa9-b6a1-b3f5ed122304-StreamThread-1

Я хочу, чтобы MultiThreading выполнял каждый процессор в потокеа затем объединить все результаты, возможно ли это с библиотекой KafkaStreams?

1 Ответ

2 голосов
/ 09 ноября 2019

A KafkaStreams экземпляр использует StreamThreads для обработки потока.

Число StreamThreads управляется свойством конфигурации StreamsConfig.NUM_STREAM_THREADS_CONFIG (num.stream.threads), которое по умолчанию равно 1 и, следовательно, чтовы видите.

Обратите внимание, что хотя ваше приложение Kafka Streams может использовать несколько потоков, одна топология (со всеми процессорами) выполняется одним потоком.

Один поток выполняет всю топологиюпросто является потребителем исходных тем Kafka, и при этом должно быть очевидно, что количество потоков (обрабатывающих одну топологию) - это точно количество разделов тем (по модулю число KafkaStreams экземпляров приложения Kafka Streams).

...