Я работаю над приложением, использующим Spring Boot 1.5.13.RELEASE и Spring Integration 4.3.16.RELEASE.
Я довольно новичок в Spring Integration и столкнулся с проблемой.
Таким образом, основная идея заключается в том, что на некоторых внешних триггерах (может быть и HTTP-вызов) мне нужно создать IntegrationFlow, который будет принимать сообщения из очереди rabbitMQ, выполнять некоторую работу с ними и затем (возможно) создавать для другогоКонечная точка rabbitMQ.
Теперь это должно происходить много раз, поэтому мне придется создать множественный IntegrationFlow.
Я использую IntegrationFlowContext зарегистрировать каждый из IntegrationFlow следующим образом:
IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();
Я должен уточнить, что это может происходить одновременно, создавая множественный IntegrationFlows одновременно.
Поэтому каждый раз, когда я пытаюсь создать IntegrationFlow, мой «источник» - это шлюз, который выглядит следующим образом:
MessagingGatewaySupport sourceGateway = Amqp
.inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
.concurrentConsumers(1)
.adviceChain(retryInterceptor)
.autoStartup(false)
.id("sgX-" + uuid)
.get();
Это не @Bean (пока), но я ожидаю, что он получитt регистрируется при регистрации каждого IntegrationFlow.
Моя "цель" - это AmqpOutBoundAdapter, который выглядит следующим образом:
@Bean
public AmqpOutboundEndpoint outboundAdapter(
RabbitTemplate rabbitTemplate,
ApplicationMessagingProperties applicationMessagingProperties
) {
return Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("someStandardExchange")
.routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
.get();
}
Теперь этот IS компонент уже есть и являетсявводится каждый раз, когда я пытаюсь создать поток.
И мой поток (ы) выглядит следующим образом:
public IntegrationFlow configure() {
return IntegrationFlows
.from(sourceGateway)
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
.filter(injectedGenericSelectorFilter)
.<HashMap<String, String>>handle((payload, headers) -> {
String uuid = payload.get("uuid");
boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
myInjectedApplicationService.handlePayload(payload);
return MessageBuilder
.withPayload(payload)
.setHeader("shouldForward", shouldForwardMessage)
.setHeader("rabbitmq.ROUTING_KEY", uuid)
.build();
})
.filter("headers.get('shouldForward').equals(true)")
.transform(Transformers.toJson(jsonObjectMapper))
.handle(outboundAdapter)
.get();
}
Моя проблема заключается в том, что пока приложение запускается нормально и создает первыйIntegrationFlows и др.позже я получаю исключения такого рода:
java.lang.IllegalStateException: не удалось зарегистрировать объект [org.springframework.integration.transformer.MessageTransformingHandler # 872] под именем компонента 'org.springframework.integration.transformer.MessageTransformingHandler # 872 ': уже существует объект [org.springframework.integration.transformer.MessageTransformingHandler # 872] bound
Я даже пытался установить идентификатор для каждого из используемых компонентов, который должен использоваться в качестве beanName, например:
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))
Но, хотя проблемы с именами bean-компонентов в таких компонентах, как .filter, были решены, я все равно получаю то же исключение для MessageTransformingHandler.
ОБНОВЛЕНИЕ
Я не упомянул тот факт, что как только каждый IntegrationFlow
завершает свою работу, он удаляется с помощью IntegrationFlowContext
следующим образом:
flowContext.remove(flowId);
Итак, похоже, что (вроде) работает синхронизация как блока регистрации потока , так и блока поток удаляет блок , используя тот же объект, что и блокировка.
Итак, мой класс, отвечающий за регистрацию и удаление потоков, выглядит так:
...
private final Object lockA = new Object();
...
public void appendNewFlow(String callUUID){
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);
synchronized (lockA) {
flowContext.registration(integrationFlow).id(callUUID).register();
}
}
public void removeFlow(String flowId){
synchronized (lockA) {
flowContext.remove(flowId);
}
}
...
Моя проблема сейчастакого рода блокировки довольно тяжело для приложения, так как я получаю довольно много:
...Waiting for workers to finish.
...
...Successfully waited for workers to finish.
, что происходит не так быстро, как хотелось бы.
НоЯ предполагаю, что это ожидается, поскольку каждый раз, когда поток получает блокировку, потребуется некоторое время, чтобы либо зарегистрировать поток и все его компоненты, либо отменить регистрацию потока и всех его компонентов.