Spring Integration: Ручная обработка каналов - PullRequest
0 голосов
/ 14 октября 2019

Что я хочу: создать настраиваемую библиотеку, которая

  • использует другую библиотеку с внутренней маршрутизацией и методом подписки, например: clientInstance.subscribe(endpoint, (endpoint, message) -> <handler>), например, библиотека Paho MQTT
  • позжев моем коде я хочу получить доступ к сообщениям в Flux.

Моя идея:

  • создать MessageChannels примерно так:

    integrationFlowContext
        .registration(IntegrationFlows.from("message-channel:" + endpoint)).bridge().get())
        .register()
    
  • переслать реактивным издателям:

    applicationContext.registerBean(
         "publisher:" + endpoint,
         Publisher.class,
         () -> IntegrationFlows.from("message-channel:" +     endpoint)).toReactivePublisher()
        );
    
  • сохранить каналы сообщений в наборе или аналогичном и реализовать описанный выше обработчик: (endpoint, message) -> messageChannels.get(endpoint).send( <converter>(message))

  • позднее использование (в методе @PostConstruct):

    Flux
       .from((Publihser<Message<?>>)applicationContext.getBean("publisher:" + enpoint))
       .map(...)
       .subscribe()
    

Я сомневаюсь, что это лучший способ сделать то, что я хочу. Похоже на злоупотребление весенней интеграцией. На данный момент приветствуются любые предложения.

В целом, однако (по крайней мере, в моих тестах) это работает. Но когда я запускаю свое приложение, я получаю сообщения об ошибках типа: «Причина: org.springframework.messaging.core.DestinationResolutionException: нет заголовка output-channel или replyChannel».

Это особенно плохо, так как после этогоИсключение - издатели утверждают, что у них больше нет подписчика. Таким образом, в реальном приложении сообщения больше не обрабатываются.

Я не уверен, что означает это сообщение, но могу воспроизвести его (но не понимаю, почему):

@Test
public void channelTest() {
    integrationFlowContext
            .registration(
                    IntegrationFlows.from("any-channel").bridge().get()
            )
            .register();

    registryUtil.registerBean(
            "any-publisher",
            Publisher.class,
            () -> IntegrationFlows.from("any-channel").toReactivePublisher()
    );

    Flux
            .from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
            .subscribe(System.out::println);

    MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
    try {
        messageChannel.send(MessageBuilder.withPayload("test").build());
    } catch (Throwable t) {
        log.error("Error: ", t);
    }

}

Я, конечно, читаю части документации по весенней интеграции, но не совсем понимаю, что происходит за кулисами. Таким образом, мне хочется угадать возможные причины ошибок.

РЕДАКТИРОВАТЬ:

Это, однако, работает:

@TestConfiguration
static class Config {

    GenericApplicationContext applicationContext;
    Config(
            GenericApplicationContext applicationContext,
            IntegrationFlowContext integrationFlowContext
    ) {
        this.applicationContext = applicationContext;
        // optional here, but needed for some reason in my library,
        // since I can't find the channel beans like I will do here,
        // if I didn't register them like so:
        //integrationFlowContext
        //    .registration(
        //    IntegrationFlows.from("any-channel").bridge().get())
        //    .register();

        applicationContext.registerBean(
                "any-publisher",
                Publisher.class,
                () -> IntegrationFlows.from("any-channel").toReactivePublisher()
        );

    }

    @PostConstruct
    void connect(){
        Flux
                .from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
                .subscribe(System.out::println);
    }

}

@Autowired
ApplicationContext applicationContext;

@Autowired
IntegrationFlowContext integrationFlowContext;

@Test
@SneakyThrows
public void channel2Test() {

    MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
    try {
        messageChannel.send(MessageBuilder.withPayload("test").build());
    } catch (Throwable t) {
        log.error("Error: ", t);
    }

}

Таким образом, очевидно, моя проблема вышеРеализованы ли сообщения, приходящие «слишком рано» .. Думаю?!

1 Ответ

1 голос
/ 14 октября 2019

Нет, ваша проблема связана с round-robin , отправленным на DirectChannel для имени компонента any-channel.

Вы определяете два экземпляра IntegrationFlow, начиная с этого каналаи затем вы объявляете своих собственных подписчиков, но во время выполнения они оба подписываются на один и тот же экземпляр any-channel. И этот по умолчанию поставляется с балансировкой round-robin . Итак, одно сообщение отправляется вашему Flux.from() подписчику, а другое - тому 1011 *, который не знает, что делать с вашим сообщением, поэтому он пытается разрешить заголовок replyChannel.

Поэтому ваше решение только с одним IntegrationFlows.from("any-channel").toReactivePublisher() является правильным. Хотя вы можете просто зарегистрировать FluxMessageChannel и использовать его с одной стороны для регулярной отправки сообщений, а с другой стороны как реактивный источник для Flux.from().

...