Пружинная интеграция конфигурации с сервисной шиной azure - PullRequest
0 голосов
/ 07 февраля 2020

Я пытаюсь настроить входящие и исходящие адаптеры, как указано в примерах удаленного разделения весеннего пакета для компонентов управления и рабочих компонентов. Возникли затруднения, поскольку они настроены в контексте AMQPConnectionFactory. Однако, когда я следую за примерами весенней интеграции, не существует класса, который может предоставить Connection Factory. Помощь приветствуется.

Ниже приведен пример кода: -

import com.microsoft.azure.spring.integration.core.DefaultMessageHandler;
import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter;
import com.microsoft.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Configuration
@IntegrationComponentScan
public class ManagerConfiguration {

    private static final int GRID_SIZE = 3;

    private static final String REQUEST_QUEUE_NAME = "digital.intg.batch.cm.request";
    private static final String REPLY_QUEUE_NAME = "digital.intg.batch.cm.reply";
    private static final String MANAGER_INPUT_CHANNEL = "manager.input";
    private static final String MANGER_OUTPUT_CHANNEL = "manager.output";

    private static final Log LOGGER = LogFactory.getLog(ManagerConfiguration.class);
    private final JobBuilderFactory jobBuilderFactory;
    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

    public ManagerConfiguration(JobBuilderFactory jobBuilderFactory,
            RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory
    ) {

        this.jobBuilderFactory = jobBuilderFactory;
        this.managerStepBuilderFactory = managerStepBuilderFactory;
    }

    /*
     * Configure outbound flow (requests going to workers)
     */
    @Bean( name = MANGER_OUTPUT_CHANNEL )
    public DirectChannel managerRequests() {
        return new DirectChannel();
    }

    /*
     * Configure inbound flow (replies coming from workers)
     */
    @Bean( name = MANAGER_INPUT_CHANNEL )
    public DirectChannel managerReplies() {
        return new DirectChannel();
    }

    @Bean
    public ServiceBusQueueInboundChannelAdapter managerQueueMessageChannelAdapter(
            @Qualifier( MANAGER_INPUT_CHANNEL ) MessageChannel inputChannel, ServiceBusQueueOperation queueOperation) {
        queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
        ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(REPLY_QUEUE_NAME,
                queueOperation);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    @ServiceActivator( inputChannel = MANGER_OUTPUT_CHANNEL )
    public MessageHandler managerQueueMessageSender(ServiceBusQueueOperation queueOperation) {
        DefaultMessageHandler handler = new DefaultMessageHandler(REQUEST_QUEUE_NAME, queueOperation);
        handler.setSendCallback(new ListenableFutureCallback<Void>() {
            @Override
            public void onSuccess(Void result) {
                LOGGER.info("Manager Request Message was sent successfully.");
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("There was an error sending request message to worker.");
            }
        });

        return handler;
    }

    @Bean
    public IntegrationFlow managerOutboundFlow(MessageHandler managerQueueMessageSender) {
        return IntegrationFlows
                .from(managerRequests())
                .handle(managerQueueMessageSender)
                .get();
    }

    @Bean
    public IntegrationFlow managerInboundFlow(ServiceBusQueueInboundChannelAdapter managerQueueMessageChannelAdapter) {
        return IntegrationFlows
                .from(managerQueueMessageChannelAdapter)
                .channel(managerReplies())
                .get();
    }

    /*
     * Configure the manager step
     */
    @Bean
    public Step managerStep() {
        return this.managerStepBuilderFactory.get("managerStep")
                .partitioner("workerStep", new BasicPartitioner())
                .gridSize(GRID_SIZE)
                .outputChannel(managerRequests())
                .inputChannel(managerReplies())
                //.aggregator()
                .build();
    }

    @Bean
    public Job remotePartitioningJob() {
        return this.jobBuilderFactory.get("remotePartitioningJob")
                .start(managerStep())
                .build();
    }

}

1 Ответ

0 голосов
/ 07 февраля 2020

В образце используется ActiveMQ, потому что он легко встраивается в JVM для наших тестов и образцов. Но вы можете использовать любого другого брокера, которого хотите.

?? что я должен вводить здесь?

Вы должны внедрить любую зависимость, требуемую обработчиком queueMessageChannelAdapter:

.handle(queueMessageChannelAdapter)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...