Мост RabbitMQ слушателя к Flux - PullRequest
0 голосов
/ 26 мая 2020

У меня есть приложение Reactive Spring Boot, получающее сообщения из RabbitMQ и сохраняющее их в репозитории (MongoDB):

@RabbitListener(...)
public void processMessage(Message message) {
    repository.persist(message).subscribe();
}
* 1003 ConnectionPool к базе данных. Если бы я получил сообщения в Flux, я мог бы concatMap() их в db или вставить их в группы из n документов.

Вот почему я попытался реализовать мост данного слушателя RabbitMQ к самоуправляемому Flux:

@Component
public class QueueListenerController {

    private final MyMongoRepository repository;
    private final FluxProcessor<Message, Message> fluxProcessor;
    private final FluxSink<Message> fluxSink;

    public QueueListenerController(MyMongoRepository repository) {
        this.repository = repository;
        this.fluxProcessor = DirectProcessor.<Message>create().serialize();
        this.fluxSink = fluxProcessor.sink();
    }

    @PostConstruct
    private void postConstruct() {
        fluxProcessor.concatMap(repository::persist)
                .subscribe();
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "my-queue", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "amq.direct", durable = "true", autoDelete = "false")
    ))
    public void processMessage(Message message) {
        fluxSink.next(message);
    }
}

Это работает локально и в течение определенного периода времени, но через некоторое время ( Я ожидаю 12-24 часа) он перестает сохранять сообщения в базе данных, поэтому я совершенно уверен, что делаю что-то не так.

Каким будет правильный способ преобразования входящих сообщений RabbitMQ в Flux сообщений?

...