Как я могу обработать метод @KafkaListener в разных потоках? - PullRequest
0 голосов
/ 19 марта 2019

У меня есть обработчик kafka в весенней загрузке:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

Например, производитель отправляет одно сообщение каждую секунду. Но myService.processResponse работают 10 секунд. Мне нужно обработать каждое сообщение и начать myService.processResponse в новой теме. Я могу создать своего исполнителя и делегировать каждый ответ на него. Но я думаю, что для них есть другие конфиги в кафке. Я нашел 2:

1) добавить concurrency = "5" к аннотации @KafkaListener - похоже, работает. Но я не уверен, насколько правильно, потому что у меня есть второй путь:

2) Я могу создать ConcurrentKafkaListenerContainerFactory и установить для него ConsumerFactory и concurrency

Я не понимаю разницы между этими методами? достаточно ли просто добавить concurrency = "5" к аннотации @KafkaListener или мне нужно создать ConcurrentKafkaListenerContainerFactory?

Или я вообще ничего не понимаю и есть другой способ?

Ответы [ 3 ]

3 голосов
/ 19 марта 2019

Использование executor усложняет задачу, связанную с управлением зафиксированными смещениями;это не рекомендуется.

При @KafkaListener каркас создает для вас ConcurrentKafkaListenerContainerFactory.

concurrency для аннотации - просто удобство;он переопределяет заводские настройки.

Это позволяет вам использовать одну и ту же фабрику с несколькими слушателями, каждый с разным параллелизмом.

Вы можете установить параллелизм контейнера (по умолчанию), используя свойство загрузки;это значение переопределяется значением аннотации;увидеть Javadocs ...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";
0 голосов
/ 24 июня 2019
Опция

concurrency не имеет ничего общего с одновременной обработкой сообщений, полученных одним и тем же потребителем. Это для групп потребителей, когда у вас есть несколько потребителей, каждый из которых обрабатывает свои собственные разделы.

Передача обработки в отдельный поток очень сложна, и команда Spring-Kafka решила не делать этого «нарочно», как мне кажется. Вам даже не нужно копаться в Spring-Kafka, чтобы понять почему. Проверьте KafkaConsumer's Обнаружение отказов потребителей doc:

Необходимо соблюдать осторожность, чтобы гарантировать, что совершенные смещения не получат впереди фактической позиции. Как правило, вы должны отключить автоматический фиксирует и вручную фиксирует обработанные смещения для записей только после нить закончила их обработку (в зависимости от доставки семантика вам нужна). Обратите внимание, что вам нужно будет приостановить раздел таким образом, чтобы от опроса не поступало новых записей до поток завершил обработку ранее возвращенных.

0 голосов
/ 19 марта 2019

Вам необходимо myService.processResponse(response); метод асинхронного.

Для этого вам нужно только сделать это:

@Async
public void processResponse(String response) {
    // Process here
}

Также вам нужно включить асинхронные функции, добавив @EnableAsyncв верхней части вашего основного класса.

@SpringBootApplication
@EnableAsync
public class Application {

    public static void main(String[] args) {
        // close the application context to shut down the custom ExecutorService
        SpringApplication.run(Application.class, args).close();
    }

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("GithubLookup-");
        executor.initialize();
        return executor;
    }
}

Проверьте, что ссылка , если у вас есть какие-либо проблемы!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...