Я пытаюсь создать Flux
из входящих сообщений, полученных из очереди.
Например, если я использую Amazon SQS, как мне написать следующий код:
Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);
messages.map (s -> log.info ("message: {}", s) .subscribe ();
После экспериментов я обнаружил следующие проблемы:
- Как мне продолжать запрашивать сообщения из очереди (цикл навсегда)? Создать ли один поток с циклом, который продолжает запрашивать из очереди?
- Как мне сделать
Flux
холодным? Я не хочу запрашивать сообщения от SQS, пока потребитель не попросит об этом. Это позволяет мне использовать противодавление.
При первом прохождении этой проблемы получилось что-то вроде следующего кода согласно документации Reactor:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
Идея состоит в том, чтобы создать отдельный поток, который продолжает запрашивать сообщения в цикле, а затем использовать шаблон наблюдателя, как указано выше, для выполнения next()
для каждого полученного сообщения.