Java: реализация многопоточного конвейера поставщик / потребитель с параллельными ограничениями для каждого типа задач - PullRequest
2 голосов
/ 04 апреля 2019

Нам нужно обрабатывать различные типы объектов асинхронно.Каждый вид / тип объектов обрабатывается с использованием ключа API.

Каждый ключ API имеет собственные ограничения на одновременное использование (например, не более 5 параллельных сеансов для одного ключа API).

У нас есть глобальное ограничение на количество рабочих потоков (ограничения ЦП).

Мы хотели бы сделать как можно больше вызовов API в пределах рабочего потока.

Возможные решения для:

2 tasks with KEY1 (max 2 session) -\  total 3 workers
5 tasks with KEY2 (max 3 session) -/

:

1. worker1: KEY2, worker2: KEY2, worker3: KEY2 (in queue: 2x KEY1, 2x KEY2)
2. worker1: KEY1, worker2: KEY2, worker3: KEY2 (in queue: 1x KEY1, 3x KEY2)
3. worker1: KEY1, worker2: KEY1, worker3: KEY2 (in queue: 4x KEY2)

Возможные решения для:

3 tasks with KEY1 (max 1 session) & 3 workers

это:

1. worker1: KEY1, worker2: IDLE, worker3: IDLE, (in queue 2x KEY1)

Порядок выполнения не имеет значения (но мы бы хотели услышать, что сначала в порядке, как политика), максимумпропускная способность является наиболее важной.

Не ясно, какую стратегию реализации выбрать.

ThreadExecutor с любой очередью недостаточно, поскольку вам необходимо знать, какие ключи API в настоящее время используются ThreadExecutor.

Ответы [ 2 ]

1 голос
/ 05 апреля 2019

Возможно, я бы создал службу, поддерживающую

  • одну Queue, содержащую записи, состоящие из задачи и соответствующего ключа,
  • a Map с ключом и ужетекущие потоки этого ключа (Map<String,AtomicInteger>) и
  • a ThreadPoolExecutor с глобально разрешенным счетчиком потоков.

Если глобальный счетчик потоков заполнен и задание отправлено, он помещается в конец очереди.

Если глобальный счетчик потоков не заполнен, значение карты, соответствующее ключу запроса, проверяется на ограничение потока ключей;если оно достигнуто, задание помещается обратно в очередь, в противном случае отправляется в службу исполнителя.

«отправка в службу исполнителя» не будет отправлять задание напрямую, но увеличит число потоков ключей и обернет егозадача в Runnable, которая дополнительно 1. уменьшит количество потоков ключей на карте и 2. вызовет переоценку очереди, чтобы при необходимости были отправлены новые задачи.

Возможно также созданиелогика «активного счетчика на ключ» в BlockingQueue, которая будет возвращать как first() следующий элемент, содержащий задачу для ключа, максимальное количество которых не было достигнуто, и передавать его в виде очереди управления конструктору ThreadPoolExecutor;но я уверен, что это нарушит контракт очереди и не будет полностью безопасным для использования.

1 голос
/ 04 апреля 2019

Я не уверен, что правильно понял вопрос, но, похоже, вам нужен Semaphore для каждого ключа API.

Semaphore key1Semaphore = new Semaphore(2);
Semaphore key2Semaphore = new Semaphore(3);

Вы можете проверить, есть ли у key1Semaphore разрешения, и получить их, если они доступны, позвонив по номеру key1Semaphore.tryAcquire(). Это неблокирующая функция, поэтому, если она завершится с ошибкой и вернет false, вы можете попытаться получить семафор из другого ключа API и отправить задачу из этого.

Важно, чтобы в конце задачи с использованием одного из ключей API разрешение семафора возвращалось обратно.

Вам может потребоваться дополнительный объект для синхронизации с wait() и notify(), чтобы после завершения задачи он уведомлял основной поток, который отправляет задачи, чтобы снова проверить семафоры.

По сути, вы получаете то, что ваш диспетчер задач отправит 5 задач вашим ExecutorService из 3 рабочих, и затем он не сможет отправлять больше, пока не будет выпущено одно из разрешений семафора.

Когда задача завершается и разрешение высвобождается, диспетчер получает уведомление, поэтому он разблокируется от ожидания и снова проверяет семафоры по порядку и отправляет задачи в ExecutorService.

Это решение немного смещено в сторону первых ключей API, но вы можете уточнить его еще больше, проверив длину задач для каждого ключа и распределив их более справедливо. Вы можете даже повернуть индекс, чтобы при каждом цикле индекс увеличивался на 1, чтобы при первом запуске с API KEY 1, а при втором - с API KEY 2 и т. Д.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...