У меня есть один производитель, который вставляет сообщения в коллекцию BlockingCollection.
Каждое сообщение в коллекции имеет идентификатор группы. Количество различных идентификаторов групп является динамическим. Сообщения для каждой группы должны обрабатываться в порядке FIFO.
Работа, которую выполняют потребители, - это небольшие вычисления и вставка базы данных, иногда дополнительный http-запрос.
Я пытался создать новый потребительский поток для каждого идентификатора группы, но есть проблемы со слишком большим количеством потоков.
Сообщения обрабатываются намного быстрее, когда я создаю 50 пользовательских потоков, таких как:
for (int i = 0; i < 50; i++)
{
Task.Factory.StartNew(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
var group = item.groupId;
// do work
}
}
}
Но с этим кодом сообщения для groupId не обрабатываются в последовательном порядке.
Есть ли простой способ сделать клиента "липким" по идентификатору группы?
например: когда сообщение с groupId 7 было обработано потребителем 3, все последующие сообщения с groupId 7 должны быть обработаны потребителем 3.