Spring интеграция Java DSL - динамически создавать IntegrationFlows - PullRequest
0 голосов
/ 17 мая 2018

Я работаю над приложением, использующим Spring Boot 1.5.13.RELEASE и Spring Integration 4.3.16.RELEASE.

Я довольно новичок в Spring Integration и столкнулся с проблемой.

Таким образом, основная идея заключается в том, что на некоторых внешних триггерах (может быть и HTTP-вызов) мне нужно создать IntegrationFlow, который будет принимать сообщения из очереди rabbitMQ, выполнять некоторую работу с ними и затем (возможно) создавать для другогоКонечная точка rabbitMQ.

Теперь это должно происходить много раз, поэтому мне придется создать множественный IntegrationFlow.

Я использую IntegrationFlowContext зарегистрировать каждый из IntegrationFlow следующим образом:

IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();

Я должен уточнить, что это может происходить одновременно, создавая множественный IntegrationFlows одновременно.

Поэтому каждый раз, когда я пытаюсь создать IntegrationFlow, мой «источник» - это шлюз, который выглядит следующим образом:

MessagingGatewaySupport sourceGateway = Amqp
        .inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
        .concurrentConsumers(1)
        .adviceChain(retryInterceptor)
        .autoStartup(false)
        .id("sgX-" + uuid)
        .get();

Это не @Bean (пока), но я ожидаю, что он получитt регистрируется при регистрации каждого IntegrationFlow.

Моя "цель" - это AmqpOutBoundAdapter, который выглядит следующим образом:

@Bean
public AmqpOutboundEndpoint outboundAdapter(
        RabbitTemplate rabbitTemplate,
        ApplicationMessagingProperties applicationMessagingProperties
) {
    return Amqp.outboundAdapter(rabbitTemplate)
            .exchangeName("someStandardExchange")
            .routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
            .get();
}

Теперь этот IS компонент уже есть и являетсявводится каждый раз, когда я пытаюсь создать поток.

И мой поток (ы) выглядит следующим образом:

public IntegrationFlow configure() {
    return IntegrationFlows
            .from(sourceGateway)
            .transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
            .filter(injectedGenericSelectorFilter)
            .<HashMap<String, String>>handle((payload, headers) -> {

                String uuid = payload.get("uuid");

                boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
                myInjectedApplicationService.handlePayload(payload);

                return MessageBuilder
                        .withPayload(payload)
                        .setHeader("shouldForward", shouldForwardMessage)
                        .setHeader("rabbitmq.ROUTING_KEY", uuid)
                        .build();
            })
            .filter("headers.get('shouldForward').equals(true)")
            .transform(Transformers.toJson(jsonObjectMapper))
            .handle(outboundAdapter)
            .get();
}

Моя проблема заключается в том, что пока приложение запускается нормально и создает первыйIntegrationFlows и др.позже я получаю исключения такого рода:

java.lang.IllegalStateException: не удалось зарегистрировать объект [org.springframework.integration.transformer.MessageTransformingHandler # 872] под именем компонента 'org.springframework.integration.transformer.MessageTransformingHandler # 872 ': уже существует объект [org.springframework.integration.transformer.MessageTransformingHandler # 872] bound

Я даже пытался установить идентификатор для каждого из используемых компонентов, который должен использоваться в качестве beanName, например:

.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))

Но, хотя проблемы с именами bean-компонентов в таких компонентах, как .filter, были решены, я все равно получаю то же исключение для MessageTransformingHandler.


ОБНОВЛЕНИЕ

Я не упомянул тот факт, что как только каждый IntegrationFlow завершает свою работу, он удаляется с помощью IntegrationFlowContext следующим образом:

flowContext.remove(flowId);

Итак, похоже, что (вроде) работает синхронизация как блока регистрации потока , так и блока поток удаляет блок , используя тот же объект, что и блокировка.

Итак, мой класс, отвечающий за регистрацию и удаление потоков, выглядит так:

...
private final Object lockA = new Object();
...

public void appendNewFlow(String callUUID){
    IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);

    synchronized (lockA) {
        flowContext.registration(integrationFlow).id(callUUID).register();
    }
}

public void removeFlow(String flowId){

    synchronized (lockA) {
        flowContext.remove(flowId); 
    }

}
...

Моя проблема сейчастакого рода блокировки довольно тяжело для приложения, так как я получаю довольно много:

...Waiting for workers to finish.
...
...Successfully waited for workers to finish.

, что происходит не так быстро, как хотелось бы.

НоЯ предполагаю, что это ожидается, поскольку каждый раз, когда поток получает блокировку, потребуется некоторое время, чтобы либо зарегистрировать поток и все его компоненты, либо отменить регистрацию потока и всех его компонентов.

1 Ответ

0 голосов
/ 17 мая 2018

У вас также есть этот:

.transform(Transformers.toJson(jsonObjectMapper))

Как это работает, если вы добавляете туда также .id()?

С другой стороны, так как вы говорите, что этопроисходит одновременно, мне интересно, можете ли вы сделать какой-то фрагмент вашего кода synchonized, например, обернуть его flowContext.registration(integrationFlow).id(callUUID).register();.

Процесс определения и регистрации bean-компонента на самом деле не является поточно-ориентированным и предназначен для использования только изодин, инициализирующий поток в начале жизненного цикла приложения.

Вероятно, нам действительно нужно сделать IntegrationFlowContext поточно-безопасным в его функции register(IntegrationFlowRegistrationBuilder builder) или, по крайней мере, в registerBean(Object bean, String beanName, String parentName), поскольку это именно то место, где мы генерируем имя компонента и регистрируем его.

Не стесняйтесь поднимать JIRA по этому вопросу.

К сожалению, проект расширения Java DSL Spring Integration уже не поддерживается, и мы можем добавить исправление только к текущему поколению 5.x.Тем не менее я считаю, что synchonized обходной путь должен работать здесь, поэтому нет необходимости переносить его обратно в расширение Java DSL Spring Integration.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...