Я пытаюсь подготовить наше приложение к следующему выпуску Spring Cloud Stream. (в настоящее время используется 3.0.0.RC1). Использование Kafka Binder.
Сейчас мы получаем одно сообщение, обрабатываем его и отправляем в другую тему. Обработка каждого сообщения в отдельности приводит к множеству отдельных запросов к нашей базе данных.
В версии 3.0.0 мы хотим обрабатывать сообщения как пакетные, поэтому мы можем сохранять данные в пакетном обновлении.
В текущей версии мы используем @EnableBinding, @ StreamListener
@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}
void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
for ( final ExchangeableItem exchangeableItem : item ) {
final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
.setHeader( "partitionKey", exchangeableItem.getId() )
.build();
messageChannel.send( message )
}
}
Я установил свойства потребителя в «пакетный режим» и изменил сигнатуру на List<>
, но в результате получаюList<byte[]>
вместо ожидаемого List<ExchangeableStock>
. Ofc возможно выполнить преобразование позже, но это похоже на «meh», я думаю, что это должно произойти до вызова Listener.
Затем я попробовал (новую) функциональную версию, и потребление работает нормально,Мне также нравится эта простая версия обработки
@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
return articleService::updateStockInformation;
}
Но тема вывода теперь получает список объектов как одно сообщение, и следующие потребители не работают правильно.
Я думаю, что что-то пропустил...
Нужно ли добавить какой-либо MessageConverter (для версии, управляемой аннотациями), или есть ли способ достичь желаемого поведения с помощью функциональной версии?