BlockingCollection нескольких потребителей на группу FIFO - PullRequest
0 голосов
/ 12 ноября 2018

У меня есть один производитель, который вставляет сообщения в коллекцию BlockingCollection.

Каждое сообщение в коллекции имеет идентификатор группы. Количество различных идентификаторов групп является динамическим. Сообщения для каждой группы должны обрабатываться в порядке FIFO. Работа, которую выполняют потребители, - это небольшие вычисления и вставка базы данных, иногда дополнительный http-запрос.

Я пытался создать новый потребительский поток для каждого идентификатора группы, но есть проблемы со слишком большим количеством потоков.

Сообщения обрабатываются намного быстрее, когда я создаю 50 пользовательских потоков, таких как:

for (int i = 0; i < 50; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            var group = item.groupId;
            // do work
        }
    }
}

Но с этим кодом сообщения для groupId не обрабатываются в последовательном порядке.

Есть ли простой способ сделать клиента "липким" по идентификатору группы?

например: когда сообщение с groupId 7 было обработано потребителем 3, все последующие сообщения с groupId 7 должны быть обработаны потребителем 3.

...