Создать поток из сообщений в очереди SQS - PullRequest
0 голосов
/ 15 ноября 2018

Я пытаюсь создать Flux из входящих сообщений, полученных из очереди.

Например, если я использую Amazon SQS, как мне написать следующий код:

Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);

messages.map (s -> log.info ("message: {}", s) .subscribe ();

После экспериментов я обнаружил следующие проблемы:

  • Как мне продолжать запрашивать сообщения из очереди (цикл навсегда)? Создать ли один поток с циклом, который продолжает запрашивать из очереди?
  • Как мне сделать Flux холодным? Я не хочу запрашивать сообщения от SQS, пока потребитель не попросит об этом. Это позволяет мне использовать противодавление.

При первом прохождении этой проблемы получилось что-то вроде следующего кода согласно документации Reactor:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register(new MyEventListener<String>() { 
        public void onDataChunk(List<String> chunk) {
            for(String s : chunk) {
                sink.next(s); 
            }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

Идея состоит в том, чтобы создать отдельный поток, который продолжает запрашивать сообщения в цикле, а затем использовать шаблон наблюдателя, как указано выше, для выполнения next() для каждого полученного сообщения.

...