Скорость потока улья - Реактивные потоки Явы или Кафки - PullRequest
0 голосов
/ 18 апреля 2019

У меня высокоскоростная конечная точка с определенной полезной нагрузкой. В очереди 800 активных слушателей. Затем мне нужно генерировать события из этой полезной нагрузки. Может быть X количество событий от каждой полезной нагрузки. Затем мне нужно объединить эти события в сегменты Y, которые будут отправлены другому сервису через http. HTTP-вызовы требуют времени, поэтому мне нужно, чтобы они были асинхронными и параллельными (дополнительное количество потоков Z для выполнения этих вызовов). Поток должен быть высокоэффективным.

То, что у меня сейчас есть, это. Слушатели http вызывают сервис, который генерирует события и помещает их в ConcurrentLinkedQueue затем вызовите ResettableCountDownLatch.countDown (установите для ожидания Y).

У меня есть пул потоков, в котором каждый исполнитель ожидает ResettableCountDownLatch, затем проверяет, имеет ли очередь больше Y, если да, тогда опрашивает события Y из очереди и выполняет вызов.

Я хотел бы услышать, есть ли более эффективный способ работы здесь, любые проекты с открытым исходным кодом, которые подходят здесь (я слышал о Reactive (при весенней загрузке 2) или Kafka Streams)

doneSignal = new ResettableCountDownLatch(y);
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(senderThreads);
    executor.submit(() -> {
        while (true) {
            try {
                doneSignal.await();
                doneSignal.reset();
                if (concurrentQueue.size() > bulkSize) {
                    log.debug("concurrentQueue size is: {}, going to start new thread", concurrentQueue.size());
                    executor.submit(() -> {
                        long start = System.currentTimeMillis();
                        while (concurrentQueue.size() > bulkSize) {
                            List<ObjectNode> events = populateList(concurrentQueue, bulkSize);
                            log.debug("got: {} from concurrentQueue, it took: {}", events.size(), (System.currentTimeMillis() - start));
                            String eventWrapper = wrapEventsInBulk(events);
                            sendEventToThirdPary(eventWrapper);
                        }
                    });
                }
            } catch (Exception e) {

            }
        }
...