автозапуск для @StreamListener - PullRequest
       58

автозапуск для @StreamListener

0 голосов
/ 28 февраля 2020

В отличие от @KafkaListener, похоже, @StreamListener не поддерживает параметр autoStartup. Есть ли способ добиться такого же поведения для @StreamListener? Вот мой пример использования:

У меня есть универсальное c Spring приложение, которое может прослушивать любую топику Kafka c и записывать в соответствующую таблицу в моей базе данных. По некоторым темам громкость низкая, и, таким образом, обработка одного сообщения с очень низкой задержкой вполне подходит. Для других тем, которые имеют большой объем, код должен получать микропакет сообщений и записывать в базу данных, используя пакет Jdb c реже. В идеале определение для слушателей должно выглядеть примерно так:

// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...

// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...

Для топи с низкой громкостью c я бы установил application.singleMessageListenerEnabled на true и application.multipleMessageListenerEnabled на false и наоборот для больших объемов topi c. Таким образом, только один из слушателей будет активно слушать сообщения, а другой не будет активно слушать.

Есть ли способ добиться этого с помощью @StreamListener?

1 Ответ

0 голосов
/ 28 февраля 2020

Во-первых, рассмотрите возможность обновления до функциональной модели программирования , что займет у вас минуты на рефакторинг. Мы почти отказались от модели программирования на основе аннотаций. Если вы делаете то, что пытаетесь выполнить, sh очень просто:

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleStreamApplication.class);
    }

    @Bean
    public Consumer<GenericRecord> singleRecordConsumer() {...}

    @Bean
    public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
}

Тогда вы можете просто использовать свойство --spring.cloud.function.definition=singleRecordConsumer для одного случая и --spring.cloud.function.definition=multipleRecordConsumer при запуске приложения, это гарантируя, какой именно c слушатель вы хотите активировать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...