Элементы предварительной загрузки для Flux.generate (...) - PullRequest
3 голосов
/ 13 января 2020

Я создаю Flux с помощью Flux.generate (). Генератор (Потребитель) фактически читает из очереди сообщений. Проблема в том, что этот вызов занимает довольно много времени (иногда даже 1-2 секунды). Это заставит поток прекратить обработку.

package com.github.loa.vault.service.listener;

import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;

import java.util.function.Consumer;

@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {

    private final QueueManipulator queueManipulator;

    @Override
    public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
        final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
                queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);

        documentSourceItemSynchronousSink.next(
                DocumentArchivingContext.builder()
                        .type(DocumentType.valueOf(documentArchivingMessage.getType()))
                        .source(documentArchivingMessage.getSource())
                        .content(documentArchivingMessage.getContent())
                        .build()
        );
    }
}

Очевидно, что добавление parallel не поможет, потому что генератор все еще вызывается по одному.

Flux.generate(vaultQueueConsumer)
    .parallel()
    .runOn(Schedulers.parallel()) 
    .flatMap(vaultDocumentManager::archiveDocument)
    .subscribe();

Кто-нибудь делает? знаете, как сделать генератор параллельным? Я не хочу использовать Flux.create(), потому что тогда я потеряю противодавление.

Ответы [ 3 ]

1 голос
/ 23 января 2020

Проблема в том, что vaultQueueConsumer включает медленную работу. Таким образом, решение состоит в том, чтобы извлечь эту медленную операцию из generate в map, которую можно распараллелить.

В качестве идеи вы можете создать имя очереди, из которой должны поступать сообщения, и выполнять фактическую потребление сообщения в методе map после параллельного потока:

String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
    .parallel()
    .runOn(Schedulers.parallel())
    .map(queueManipulator::readMessage)
    .doOnNext(log::info)
    .subscribe();

Подделка QueueManipulator спит 1-2 секунды перед возвратом сообщения:

public class QueueManipulator {

  private final AtomicLong counter = new AtomicLong();

  public String readMessage(String queue) {
    sleep(); //sleep 1-2 seconds
    return queue + " " + counter.incrementAndGet();
  }
  //...
}

Таким образом, потребление сообщения выполняется параллельно:

12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8

Приведенное выше решение является простой покупкой, может выглядеть как «взлом».

Другая идея - позвонить Flux.generate в flatMap:

String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
    .flatMap(i ->
        Flux.<String>generate(synchronousSink -> {
          synchronousSink.next(queueManipulator.readMessage(queue));
        }).subscribeOn(Schedulers.parallel()))
    .doOnNext(log::info)
    .subscribe();
1 голос
/ 25 января 2020
Mono.just(1).repeat()  // create infinite flux, maybe there is a nicer way for that?
    .flatMap(this::readFromQueue, 100) // define queue polling concurrency
    .flatMap(this::archiveDocument)
    .subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
    return Mono.fromCallable(() -> {
        Thread.sleep(1500); // your actual blocking queue polling here
        return "queue_element";
    }).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}
0 голосов
/ 20 января 2020

Вы пробовали:

Flux.generate(vaultQueueConsumer)
 .parallel()
 .runOn(Schedulers.parallel()) 
 .flatMap(vaultDocumentManager::archiveDocument)
 .subscribe();
...