Без SpEL в filter()
вы можете вместо этого использовать лямбду Java:
.filter(Message.class, m -> m.getHeaders().get("type") == "TYPE_A")
Spring Cloud Stream - это самоуверенная платформа, где JSON является типом содержимого по умолчанию для данных, проходящих через поток иза пределами / из целевой системы обмена сообщениями.
Spring Integration - это библиотека, позволяющая создавать интеграционные приложения.Там у нас просто не может быть никакого мнения относительно какой-либо конвертации типов контента по умолчанию.Там нет никаких нестандартных предположений, что вы собираетесь преобразовать свой входящий byte[]
в какой-нибудь POJO из-за JSON.Несмотря на то, что мы немного учитываем некоторые возможности, которые видны так же, как и Spring Cloud Stream, в обработчике метода POJO есть ловушка для преобразования из JSON в ожидаемый POJO.Но это делается только для пользовательских методов POJO, когда они также помечены @serviceActivator
.Отсюда мы не можем принять ваши ожидания в .transform()
лямбде.Вам нужно иметь какой-то сервис с методом и использовать его в:
/**
* Populate a {@link ServiceActivatingHandler} for the
* {@link org.springframework.integration.handler.MethodInvokingMessageProcessor}
* to invoke the {@code method} for provided {@code bean} at runtime.
* In addition accept options for the integration endpoint using {@link GenericEndpointSpec}.
* @param service the service object to use.
* @param methodName the method to invoke.
* @return the current {@link IntegrationFlowDefinition}.
*/
public B handle(Object service, String methodName) {
Таким образом, он будет работать так же, как в Spring Cloud Stream с @StreamListener
.