Моя конфигурация для потребителя описана в Документация по свойствам потребителя Spring Cloud Stream.
spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'
У меня 4 раздела для темы kstream_test
, и они заполнены сообщениями отпроизводитель, как показано ниже:
root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278
Моя конфигурация на основе связующего источника kafka для весеннего облака:
spring.cloud.stream.bindings.input:
destination: kstream_test
group: consumer-group-G1_test
consumer:
useNativeDecoding: true
headerMode: raw
startOffset: latest
partitioned: true
concurrency: 3
Класс прослушивателя KStream
@StreamListener
@SendTo(MessagingStreams.OUTPUT)
public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
......
log.info("Got a message");
......
return kstreams;
}
Мой производитель отправляет 100 сообщенийза 1 прогон.Но журналы, кажется, имеют только 1 поток StreamThread-1
, обрабатывающий сообщения, хотя у меня есть параллелизм как 3. Что может быть здесь не так?Разве 100 сообщений недостаточно, чтобы увидеть параллелизм при воспроизведении?
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.945 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.956 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.972 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
ОБНОВЛЕНИЕ:
Согласно ответу, приведенная ниже конфигурация num.stream.threads
работает на связывателеуровень.
spring.cloud.stream.kafka.streams.binder.configuration:
num.stream.threads: 3