При использовании интеграции Kafka и настройке QueueChannel,
Обработка сообщений после получения канала очереди выполняется последовательно с задержкой в одну секунду, невозможно понять причину, канал очереди должен быть накоплением сообщений (до установленного предела) и освобождать сообщения из очереди, пока она не пуста и существует потребитель. Почему сообщения передаются последовательно с задержкой в одну секунду?
follows the log, as can be seen, the messages are received immediately (according to the date of the log) and are processed sequentially with a delay of 1 second?
2020-04-06 13:08:28.108 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 2 - enriched
2020-04-06 13:08:28.109 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 2 - enriched
2020-04-06 13:08:28.110 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 7 - enriched
2020-04-06 13:08:28.111 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 5 - enriched
2020-04-06 13:08:28.116 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 6 - enriched
2020-04-06 13:08:28.119 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 4 - enriched
2020-04-06 13:08:28.120 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 1 - enriched
2020-04-06 13:08:28.121 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 8 - enriched
2020-04-06 13:08:28.122 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 3 - enriched
2020-04-06 13:08:28.123 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 9 - enriched
2020-04-06 13:08:28.124 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 10 - enriched
2020-04-06 13:08:29.111 INFO 30718 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 7 - enriched
2020-04-06 13:08:30.112 INFO 30718 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 5 - enriched
2020-04-06 13:08:31.112 INFO 30718 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 6 - enriched
2020-04-06 13:08:32.113 INFO 30718 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 4 - enriched
2020-04-06 13:08:33.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 1 - enriched
2020-04-06 13:08:34.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 8 - enriched
2020-04-06 13:08:35.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 3 - enriched
2020-04-06 13:08:36.114 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 9 - enriched
2020-04-06 13:08:37.114 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 10 - enriched
Blockquote
package br.com.gubee.kafaexample
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
import org.springframework.integration.annotation.Gateway
import org.springframework.integration.annotation.MessagingGateway
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.context.IntegrationContextUtils
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.kafka.dsl.Kafka
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping(path = ["/testKafka"], produces = [MediaType.APPLICATION_JSON_VALUE])
class TestKafkaResource(private val testKafkaGateway: TestKafkaGateway) {
@GetMapping("init/{param}")
fun init(@PathVariable("param", required = false) param: String? = null) {
(1..10).forEach {
println("Send async item $it")
testKafkaGateway.init("item: $it")
}
}
}
@MessagingGateway(errorChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
@Component
interface TestKafkaGateway {
@Gateway(requestChannel = "publishKafkaChannel")
@Async
fun init(param: String)
}
@Configuration
@EnableIntegration
class TestKafkaFlow(private val kafkaTemplate: KafkaTemplate<*, *>,
private val consumerFactory: ConsumerFactory<*, *>) {
@Bean
fun readKafkaChannelTopic(): NewTopic {
return NewTopic("readKafkaChannel", 40, 1)
}
@Bean
fun publishKafka(): IntegrationFlow {
return IntegrationFlows
.from("publishKafkaChannel")
.transform<String, String> { "${it} - enriched" }
.handle(
Kafka.outboundChannelAdapter(kafkaTemplate)
.topic("readKafkaChannel")
.sendFailureChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
)
.get()
}
@Bean
fun readFromKafka(): IntegrationFlow {
return IntegrationFlows
.from(
Kafka.messageDrivenChannelAdapter(consumerFactory, "readKafkaChannel")
.configureListenerContainer { kafkaMessageListenerContainer ->
kafkaMessageListenerContainer.concurrency(2)
kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
}
.errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
)
.channel { c -> c.queue(10) }
.log<String> {
"readKafkaChannel: ${it.payload}"
}
.channel("channelThatIsProcessingSequential")
.get()
}
@Bean
fun kafkaFlowAfter(): IntegrationFlow {
return IntegrationFlows
.from("channelThatIsProcessingSequential")
.log<String> {
"channelThatIsProcessingSequential - ${it.payload}"
}
.get()
}
}