Java: параллельная очередь с долгоживущими потребителями и гарантии обработки для недолговечных производителей - PullRequest
0 голосов
/ 13 октября 2018

Я ищу существующую реализацию для параллельной очереди, которую можно использовать для реализации типичного шаблона производитель-потребитель со следующим предупреждением.Каждый производитель недолговечен, но перед выходом должен блокироваться, пока все сообщения, помещенные в очередь, не будут обработаны потребителями.Потребитель и очередь являются долгоживущими.

Некоторые идеи, которые у меня возникли, касались либо использования одной многораздельной очереди, где каждый раздел назначен производителю, либо создания выделенной очереди для каждого производителя и создания своего родасоставная очередь поверх очередей производителей, которые будут использоваться потребителями.

Похоже, что это будет общий шаблон для серверов http, которые имеют многочисленные потоки http, которые будут действовать как производители для очереди и фиксированного числарабочих потоков, которые будут действовать как потребители очереди.Затем угроза http будет блокироваться до тех пор, пока работа, которую она ставит в очередь, не будет полностью обработана потребителями перед возвратом ответа http клиенту http.

1 Ответ

0 голосов
/ 13 октября 2018

Каждый производитель недолговечен, но перед выходом должен блокироваться, пока все сообщения, помещенные в очередь, не будут обработаны потребителями.Потребитель и очередь являются долгоживущими.

Stream.parallelStream() делает это.Это общий шаблон, который можно решить несколькими способами, например, ExecutorService.submit (task) + Future.get ().

построить своего рода составную очередь поверх очередей производителей

Я бы постарался сделать его максимально простым.

То, что работает лучше всего, зависит от загруженности ваших задач.например, он связан с ЦП или IO.

извлекает из очереди 1000 сообщений, обрабатывает сообщения, уведомляет производителей об успешно обработанных сообщениях;

List<Message> toProcess = someSourceOfMessages(1000);
toProcess.parallelStream().forEach(m -> process(m));
// do something after all tasks are complete.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...