У меня высокоскоростная конечная точка с определенной полезной нагрузкой.
В очереди 800 активных слушателей.
Затем мне нужно генерировать события из этой полезной нагрузки. Может быть X количество событий от каждой полезной нагрузки.
Затем мне нужно объединить эти события в сегменты Y, которые будут отправлены другому сервису через http.
HTTP-вызовы требуют времени, поэтому мне нужно, чтобы они были асинхронными и параллельными (дополнительное количество потоков Z для выполнения этих вызовов).
Поток должен быть высокоэффективным.
То, что у меня сейчас есть, это.
Слушатели http вызывают сервис, который генерирует события и помещает их в ConcurrentLinkedQueue
затем вызовите ResettableCountDownLatch.countDown (установите для ожидания Y).
У меня есть пул потоков, в котором каждый исполнитель ожидает ResettableCountDownLatch, затем проверяет, имеет ли очередь больше Y, если да, тогда опрашивает события Y из очереди и выполняет вызов.
Я хотел бы услышать, есть ли более эффективный способ работы здесь, любые проекты с открытым исходным кодом, которые подходят здесь (я слышал о Reactive (при весенней загрузке 2) или Kafka Streams)
doneSignal = new ResettableCountDownLatch(y);
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(senderThreads);
executor.submit(() -> {
while (true) {
try {
doneSignal.await();
doneSignal.reset();
if (concurrentQueue.size() > bulkSize) {
log.debug("concurrentQueue size is: {}, going to start new thread", concurrentQueue.size());
executor.submit(() -> {
long start = System.currentTimeMillis();
while (concurrentQueue.size() > bulkSize) {
List<ObjectNode> events = populateList(concurrentQueue, bulkSize);
log.debug("got: {} from concurrentQueue, it took: {}", events.size(), (System.currentTimeMillis() - start));
String eventWrapper = wrapEventsInBulk(events);
sendEventToThirdPary(eventWrapper);
}
});
}
} catch (Exception e) {
}
}