Пружина DSL обрабатывает после потока? - PullRequest
0 голосов
/ 23 января 2019

Я пытаюсь настроить следующий поток: попытаться получить блокировку при поступлении сообщения в очередь Rabbit, запросить у удаленного файлового сервера некоторые файлы и отправить новое сообщение в другую очередь для каждого найденного файла и снять блокировкупосле отправки всех файлов.

IntegrationFlows.from(Amqp.inboundGateway(container)
    .messageConverter(messageConverter)
)
    .filter(m -> lockService.acquire())
    .transform(m -> remoteFileTemplate.list(inputDirectory))
    .split()
    .handle(Amqp.outboundAdapter(amqpTemplate)
        .exchangeName("")
        .routingKey(routingKey)
    .aggregate()
    .handle(m -> {
      log.info("Releasing lock");
      lock.release();
    })
    .get();

Проблема в том, что поток останавливается после первого .handle метода (честно, как и ожидалось), и я не могу понять, как настроить его так, чтобы он делал то, что я хочу?Я попытался использовать .wireTap и .publishSubscribeChannel, но это делает 2 потока, которые не зависят друг от друга, и моя блокировка освобождается до фактической отправки файлов.

Было бы здорово, если бы кто-то мог помочь мне объяснитькак это исправить с помощью DSL, потому что я создаю эти потоки динамически ...

  • EDIT -

Моя попытка установить перехватчик на канале:

final DirectChannel channel = new DirectChannel();
channel.setInterceptors(Collections.singletonList(new ChannelInterceptor() {
  @Override
  public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
    lockService.acquire();
    return message;
  }

  @Override
  public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
    lock.release();
  }
}));
IntegrationFlows.from(Amqp.inboundGateway(container)
    .messageConverter(messageConverter)
)
    .channel(channel)
    .transform(m -> remoteFileTemplate.list(inputDirectory))
    .split()
    .handle(Amqp.outboundAdapter(amqpTemplate)
        .exchangeName("")
        .routingKey(routingKey)
    .get();

Но при этом блокировка получается, освобождается и только после этого извлекаются сообщения.Что я делаю не так?

  • РЕДАКТИРОВАТЬ 2 -

Понял это из справки в чате Gitter, на случай, если кто-то застрянет:

IntegrationFlows.from(Amqp.inboundGateway(container)
    .messageConverter(messageConverter)
)
.channel(MessageChannels.direct().interceptor(new ChannelInterceptor() {
  @Override
  public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
    lockService.acquire();
    return message;
  }

  @Override
  public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
    lockService.release();
  }
}))
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
    .exchangeName("")
    .routingKey(routingKey)
    .get());

1 Ответ

0 голосов
/ 23 января 2019

pub / sub после разделения, с обработчиком AMQP на одном подпотоке и агрегатором на другом должно работать нормально.

Каждый будет вызываться последовательно в одном и том же потоке, а последнее сообщение, вызывающее освобождение из агрегатора, снова будет вызываться в том же потоке.

Сказав это, вам потребуется некоторая обработка errorChannel на входящем шлюзе, чтобы снять блокировку в случае возникновения ошибки.

EDIT

Менее сложным решением было бы пользовательское ChannelInterceptor на канале до преобразования вместо фильтра, чтобы заблокировать блокировку в preSend() и снять ее в afterSendCompleted() (которая вызывается как для успеха, так и для неудачи) .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...