Spring Cloud Stream KStream Consumer Concurrency никак не влияет? - PullRequest
0 голосов
/ 18 октября 2018

Моя конфигурация для потребителя описана в Документация по свойствам потребителя Spring Cloud Stream.

spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'

У меня 4 раздела для темы kstream_test, и они заполнены сообщениями отпроизводитель, как показано ниже:

root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278

Моя конфигурация на основе связующего источника kafka для весеннего облака:

spring.cloud.stream.bindings.input:
  destination: kstream_test
  group: consumer-group-G1_test
  consumer:
    useNativeDecoding: true
    headerMode: raw
    startOffset: latest
    partitioned: true
    concurrency: 3

Класс прослушивателя KStream

    @StreamListener
    @SendTo(MessagingStreams.OUTPUT)
    public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
        ......
        log.info("Got a message");
        ......
        return kstreams;
    }

Мой производитель отправляет 100 сообщенийза 1 прогон.Но журналы, кажется, имеют только 1 поток StreamThread-1, обрабатывающий сообщения, хотя у меня есть параллелизм как 3. Что может быть здесь не так?Разве 100 сообщений недостаточно, чтобы увидеть параллелизм при воспроизведении?

2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.945  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.956  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.972  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message

ОБНОВЛЕНИЕ:

Согласно ответу, приведенная ниже конфигурация num.stream.threads работает на связывателеуровень.

spring.cloud.stream.kafka.streams.binder.configuration:
 num.stream.threads: 3

1 Ответ

0 голосов
/ 18 октября 2018

Кажется, что num.stream.threads должен быть установлен для увеличения параллелизма ...

/** {@code num.stream.threads} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";

... по умолчанию он равен 1.

Связыватель должен действительно установить, чтона основе свойства ...consumer.concurrency;откройте, пожалуйста, проблему github для связывателя .

Тем временем вы можете просто установить это свойство непосредственно в ...consumer.configuration.

ИСПРАВЛЕНИЕ

Мне только что сказали, что ...consumer.configuration в настоящее время также не применяется к связыванию потоков;вам придется установить его на уровне связующего.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...