Я пытаюсь настроить следующий поток: попытаться получить блокировку при поступлении сообщения в очередь Rabbit, запросить у удаленного файлового сервера некоторые файлы и отправить новое сообщение в другую очередь для каждого найденного файла и снять блокировкупосле отправки всех файлов.
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.filter(m -> lockService.acquire())
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.aggregate()
.handle(m -> {
log.info("Releasing lock");
lock.release();
})
.get();
Проблема в том, что поток останавливается после первого .handle
метода (честно, как и ожидалось), и я не могу понять, как настроить его так, чтобы он делал то, что я хочу?Я попытался использовать .wireTap
и .publishSubscribeChannel
, но это делает 2 потока, которые не зависят друг от друга, и моя блокировка освобождается до фактической отправки файлов.
Было бы здорово, если бы кто-то мог помочь мне объяснитькак это исправить с помощью DSL, потому что я создаю эти потоки динамически ...
Моя попытка установить перехватчик на канале:
final DirectChannel channel = new DirectChannel();
channel.setInterceptors(Collections.singletonList(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lock.release();
}
}));
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(channel)
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get();
Но при этом блокировка получается, освобождается и только после этого извлекаются сообщения.Что я делаю не так?
Понял это из справки в чате Gitter, на случай, если кто-то застрянет:
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(MessageChannels.direct().interceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lockService.release();
}
}))
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get());