Sping Cloud Stream Kafka - использует сообщения в пакетном режиме и отправляет их как отдельные сообщения - PullRequest
0 голосов
/ 08 ноября 2019

Я пытаюсь подготовить наше приложение к следующему выпуску Spring Cloud Stream. (в настоящее время используется 3.0.0.RC1). Использование Kafka Binder.

Сейчас мы получаем одно сообщение, обрабатываем его и отправляем в другую тему. Обработка каждого сообщения в отдельности приводит к множеству отдельных запросов к нашей базе данных.

В версии 3.0.0 мы хотим обрабатывать сообщения как пакетные, поэтому мы можем сохранять данные в пакетном обновлении.

В текущей версии мы используем @EnableBinding, @ StreamListener

@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
    publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}

void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
    for ( final ExchangeableItem exchangeableItem : item ) {
        final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
                            .setHeader( "partitionKey", exchangeableItem.getId() )
                            .build();
        messageChannel.send( message )
    }
}

Я установил свойства потребителя в «пакетный режим» и изменил сигнатуру на List<>, но в результате получаюList<byte[]> вместо ожидаемого List<ExchangeableStock>. Ofc возможно выполнить преобразование позже, но это похоже на «meh», я думаю, что это должно произойти до вызова Listener.

Затем я попробовал (новую) функциональную версию, и потребление работает нормально,Мне также нравится эта простая версия обработки

@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
    return articleService::updateStockInformation;
}

Но тема вывода теперь получает список объектов как одно сообщение, и следующие потребители не работают правильно.

Я думаю, что что-то пропустил...

Нужно ли добавить какой-либо MessageConverter (для версии, управляемой аннотациями), или есть ли способ достичь желаемого поведения с помощью функциональной версии?

Ответы [ 2 ]

0 голосов
/ 12 ноября 2019

Мне это удалось:

@Bean
@Measure
public Consumer<List<ExchangeableStock>> stocks() {
    return items -> {
        for ( final ExchangeableStock exchangeableItem : articleService. updateStockInformation( items ) ) {
            final Message<?> message = MessageBuilder.withPayload( exchangeableItem )
                            .setHeader( "partitionKey", exchangeableItem.getId() )
                            .setHeader( KafkaHeaders.TOPIC, "stocks-stg" )
                            .build();

            processor.onNext( message );
        }
    };
}

private final TopicProcessor<Message<?>> processor = TopicProcessor.create();

@Bean
@Measure
public Supplier<Flux<?>> source() {
    return () -> processor;
}

Но динамическое разрешение по месту назначения не работает для меня. Я попытался использовать KafkaHeaders.TOPIC и spring.cloud.stream.sendto.destination в качестве заголовка и установить свойство производителя привязок Kafka use-topic-header: true (для привязки source-out-0)

Если я установил пункт назначения для source-out-0, это работает, но это приведет к большому количеству TopicProceessor s и Supplier s - у нас есть около 10 различных типов сообщений.

Может быть, я упустил что-то маленькое, чтобы заставить работать динамическое назначение ...

0 голосов
/ 08 ноября 2019

IIRC, пакетный режим поддерживается только функциями.

Разве вы не можете использовать Consumer<List< ExchangeableStock>> и отправлять сообщения на канал, как вы это делаете в своем StreamListener?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...