Почему Spring Integration QueueChannel запускается последовательно с задержкой доставки сообщения в kafka - PullRequest
1 голос
/ 06 апреля 2020

При использовании интеграции Kafka и настройке QueueChannel,

Обработка сообщений после получения канала очереди выполняется последовательно с задержкой в ​​одну секунду, невозможно понять причину, канал очереди должен быть накоплением сообщений (до установленного предела) и освобождать сообщения из очереди, пока она не пуста и существует потребитель. Почему сообщения передаются последовательно с задержкой в ​​одну секунду?

follows the log, as can be seen, the messages are received immediately (according to the date of the log) and are processed sequentially with a delay of 1 second?
2020-04-06 13:08:28.108  INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 2 - enriched
2020-04-06 13:08:28.109  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 2 - enriched
2020-04-06 13:08:28.110  INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 7 - enriched
2020-04-06 13:08:28.111  INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 5 - enriched
2020-04-06 13:08:28.116  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 6 - enriched
2020-04-06 13:08:28.119  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 4 - enriched
2020-04-06 13:08:28.120  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 1 - enriched
2020-04-06 13:08:28.121  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 8 - enriched
2020-04-06 13:08:28.122  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 3 - enriched
2020-04-06 13:08:28.123  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 9 - enriched
2020-04-06 13:08:28.124  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 10 - enriched
2020-04-06 13:08:29.111  INFO 30718 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 7 - enriched
2020-04-06 13:08:30.112  INFO 30718 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 5 - enriched
2020-04-06 13:08:31.112  INFO 30718 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 6 - enriched
2020-04-06 13:08:32.113  INFO 30718 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 4 - enriched
2020-04-06 13:08:33.113  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 1 - enriched
2020-04-06 13:08:34.113  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 8 - enriched
2020-04-06 13:08:35.113  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 3 - enriched
2020-04-06 13:08:36.114  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 9 - enriched
2020-04-06 13:08:37.114  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 10 - enriched

Blockquote

package br.com.gubee.kafaexample

import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
import org.springframework.integration.annotation.Gateway
import org.springframework.integration.annotation.MessagingGateway
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.context.IntegrationContextUtils
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.kafka.dsl.Kafka
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController


@RestController
@RequestMapping(path = ["/testKafka"], produces = [MediaType.APPLICATION_JSON_VALUE])
class TestKafkaResource(private val testKafkaGateway: TestKafkaGateway) {

    @GetMapping("init/{param}")
    fun init(@PathVariable("param", required = false) param: String? = null) {
        (1..10).forEach {
            println("Send async item $it")
            testKafkaGateway.init("item: $it")
        }
    }

}

@MessagingGateway(errorChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
@Component
interface TestKafkaGateway {
    @Gateway(requestChannel = "publishKafkaChannel")
    @Async
    fun init(param: String)
}

@Configuration
@EnableIntegration
class TestKafkaFlow(private val kafkaTemplate: KafkaTemplate<*, *>,
                    private val consumerFactory: ConsumerFactory<*, *>) {

    @Bean
    fun readKafkaChannelTopic(): NewTopic {
        return NewTopic("readKafkaChannel", 40, 1)
    }

    @Bean
    fun publishKafka(): IntegrationFlow {
        return IntegrationFlows
                .from("publishKafkaChannel")
                .transform<String, String> { "${it} - enriched" }
                .handle(
                        Kafka.outboundChannelAdapter(kafkaTemplate)
                                .topic("readKafkaChannel")
                                .sendFailureChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                )
                .get()
    }

    @Bean
    fun readFromKafka(): IntegrationFlow {
        return IntegrationFlows
                .from(
                        Kafka.messageDrivenChannelAdapter(consumerFactory, "readKafkaChannel")
                                .configureListenerContainer { kafkaMessageListenerContainer ->
                                    kafkaMessageListenerContainer.concurrency(2)
                                    kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
                                }
                                .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                )
                .channel { c -> c.queue(10) }
                .log<String> {
                    "readKafkaChannel: ${it.payload}"
                }
                .channel("channelThatIsProcessingSequential")
                .get()
    }

    @Bean
    fun kafkaFlowAfter(): IntegrationFlow {
        return IntegrationFlows
                .from("channelThatIsProcessingSequential")
                .log<String> {
                    "channelThatIsProcessingSequential - ${it.payload}"
                }
                .get()
    }
}

1 Ответ

3 голосов
/ 06 апреля 2020

Как сказал Гари, нехорошо переводить сообщения Кафки в QueueChannel. Потребление на Kafka.messageDrivenChannelAdapter() уже asyn c - на самом деле нет причин перемещать сообщения в отдельный поток.

В любом случае, похоже, что вы используете Spring Cloud Stream с его PollerMetadata настроен на 1 message per second политику опроса.

Если это не соответствует вашим требованиям, вы всегда можете изменить это .channel { c -> c.queue(10) }, чтобы использовать вторую лямбду и настроить там poller.

Кстати, у нас уже есть некоторая реализация Kotlin DSL в Spring Integration: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/kotlin-dsl.html#kotlin -dsl

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...