Невозможно создать горячий поток из очереди - PullRequest
0 голосов
/ 04 июня 2019

У меня есть следующий контроллер rest, который получает запросы, преобразует их в строки JSON и помещает их в параллельную очередь.Я хотел бы сделать Flux из этой очереди и подписаться на него.
К сожалению, это не работает.
Что я здесь не так делаю?

@RestController
public class EventController {
    private final ObjectMapper mapper = new ObjectMapper();
    private final FirehosePutService firehosePutService;
    private ConcurrentLinkedQueue<String> events = new ConcurrentLinkedQueue<>();
    private int batchSize = 10;

    @Autowired
    public EventController(FirehosePutService firehosePutService) {
        this.firehosePutService = firehosePutService;

        Flux<String> eventFlux = Flux.create((FluxSink<String> sink) -> {
            String next;
            while (( next = events.poll()) != null) {
                sink.next(next);
            }
        });

        eventFlux.publish().autoConnect().subscribe(new BaseSubscriber<String>() {
            int consumed;
            List<String> batchOfEvents = new ArrayList<>(batchSize);

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(batchSize);
            }

            @Override
            protected void hookOnNext(String value) {
                batchOfEvents.add(value);
                consumed++;

                if (consumed == batchSize) {
                    batchOfEvents.addAll(events);
                    log.info("Consume {} elements. Size of batchOfEvents={}", consumed, batchOfEvents.size());
                    firehosePutService.saveBulk(batchOfEvents);
                    consumed = 0;
                    batchOfEvents.clear();
                    events.clear();

                    request(batchSize);
                }
            }

        });
    }

    @GetMapping(value = "/saveMany", produces = "text/html")
    public ResponseEntity<Void> saveMany(@RequestParam MultiValueMap<String, String> allRequestParams) throws JsonProcessingException {
        Map<String, String> paramValues = allRequestParams.toSingleValueMap();
        String reignnEvent = mapper.writeValueAsString(paramValues);
        events.add(reignnEvent);

        return new ResponseEntity<>(HttpStatus.OK);
    }
}

1 Ответ

1 голос
/ 04 июня 2019

Прежде всего, вы используете poll метод. Он не блокируется и возвращает null, если очередь пуста. Вы зацикливаете коллекцию до первого null (т. Е. while (next != null), поэтому ваш код выходит из цикла практически сразу, поскольку очередь при запуске пуста. Вы должны заменить poll на take, который блокирует и будет ждать, пока элемент не станет доступен.

Во-вторых, hookOnNext вызывается, когда событие удаляется из очереди. Однако вы пытаетесь снова прочитать события, используя batchOfEvents.addAll(events);. Кроме того, вы также очищаете все ожидающие события events.clear();

Советую удалить прямой доступ к коллекции events из метода hookOnNext.


Почему вы вообще здесь используете Flux? Кажется слишком сложным. Вы можете использовать простую нить здесь

@Autowired
public EventController(FirehosePutService firehosePutService) {
    this.firehosePutService = firehosePutService;

    Thread persister = new Thread(() -> {
        List<String> batchOfEvents = new ArrayList<>(batchSize);

        String next;
        while (( next = events.take()) != null) {
            batchOfEvents.add(value);

            if (batchOfEvents.size() == batchSize) {
                log.info("Consume {} elements. Size of batchOfEvents={}", consumed, batchOfEvents.size());
                firehosePutService.saveBulk(batchOfEvents);
                batchOfEvents.clear();
            }
        }
    });
    persister.start();

}
...