Что я хочу: создать настраиваемую библиотеку, которая
- использует другую библиотеку с внутренней маршрутизацией и методом подписки, например:
clientInstance.subscribe(endpoint, (endpoint, message) -> <handler>)
, например, библиотека Paho MQTT - позжев моем коде я хочу получить доступ к сообщениям в
Flux
.
Моя идея:
создать MessageChannels примерно так:
integrationFlowContext
.registration(IntegrationFlows.from("message-channel:" + endpoint)).bridge().get())
.register()
переслать реактивным издателям:
applicationContext.registerBean(
"publisher:" + endpoint,
Publisher.class,
() -> IntegrationFlows.from("message-channel:" + endpoint)).toReactivePublisher()
);
сохранить каналы сообщений в наборе или аналогичном и реализовать описанный выше обработчик: (endpoint, message) -> messageChannels.get(endpoint).send( <converter>(message))
позднее использование (в методе @PostConstruct
):
Flux
.from((Publihser<Message<?>>)applicationContext.getBean("publisher:" + enpoint))
.map(...)
.subscribe()
Я сомневаюсь, что это лучший способ сделать то, что я хочу. Похоже на злоупотребление весенней интеграцией. На данный момент приветствуются любые предложения.
В целом, однако (по крайней мере, в моих тестах) это работает. Но когда я запускаю свое приложение, я получаю сообщения об ошибках типа: «Причина: org.springframework.messaging.core.DestinationResolutionException: нет заголовка output-channel или replyChannel».
Это особенно плохо, так как после этогоИсключение - издатели утверждают, что у них больше нет подписчика. Таким образом, в реальном приложении сообщения больше не обрабатываются.
Я не уверен, что означает это сообщение, но могу воспроизвести его (но не понимаю, почему):
@Test
public void channelTest() {
integrationFlowContext
.registration(
IntegrationFlows.from("any-channel").bridge().get()
)
.register();
registryUtil.registerBean(
"any-publisher",
Publisher.class,
() -> IntegrationFlows.from("any-channel").toReactivePublisher()
);
Flux
.from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
.subscribe(System.out::println);
MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
try {
messageChannel.send(MessageBuilder.withPayload("test").build());
} catch (Throwable t) {
log.error("Error: ", t);
}
}
Я, конечно, читаю части документации по весенней интеграции, но не совсем понимаю, что происходит за кулисами. Таким образом, мне хочется угадать возможные причины ошибок.
РЕДАКТИРОВАТЬ:
Это, однако, работает:
@TestConfiguration
static class Config {
GenericApplicationContext applicationContext;
Config(
GenericApplicationContext applicationContext,
IntegrationFlowContext integrationFlowContext
) {
this.applicationContext = applicationContext;
// optional here, but needed for some reason in my library,
// since I can't find the channel beans like I will do here,
// if I didn't register them like so:
//integrationFlowContext
// .registration(
// IntegrationFlows.from("any-channel").bridge().get())
// .register();
applicationContext.registerBean(
"any-publisher",
Publisher.class,
() -> IntegrationFlows.from("any-channel").toReactivePublisher()
);
}
@PostConstruct
void connect(){
Flux
.from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
.subscribe(System.out::println);
}
}
@Autowired
ApplicationContext applicationContext;
@Autowired
IntegrationFlowContext integrationFlowContext;
@Test
@SneakyThrows
public void channel2Test() {
MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
try {
messageChannel.send(MessageBuilder.withPayload("test").build());
} catch (Throwable t) {
log.error("Error: ", t);
}
}
Таким образом, очевидно, моя проблема вышеРеализованы ли сообщения, приходящие «слишком рано» .. Думаю?!