У меня есть очередь RabbitMQ, которую я читаю асинхронно в пакетном режиме, но я должен сохранить порядок этих сообщений.У меня есть поле с именем ServiceNumber
, которое определяет уникальный номер сообщения, и этот порядок я должен соблюдать.
Например
SN1 SN2 SN1 SN1 SN1 SN2
1 2 3 4 5 6
В этом случае мы можем обрабатывать сообщения 1и 2 одновременно (они имеют разные SN), затем мы можем обработать 3 и 6, затем 4, затем 5.
Я попытался реализовать это через цепочку ContinueWith
следующим образом:
private readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
private async Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)
{
Task taskToAwait;
await _semaphore.WaitAsync();
try
{
_currentTasks.TryGetValue(serviceNumber, out var task);
if (task == null)
task = Task.CompletedTask;
taskToAwait = _currentTasks[serviceNumber] = task.ContinueWith(_ => taskFunc());
}
finally
{
_semaphore.Release();
}
await taskToAwait.ConfigureAwait(false);
}
void Main()
{
Task.Run(async () => {
var task1 = Task.Run(() =>
{
return WrapMessageInQueue("10", async () =>
{
await Task.Delay(5000);
Console.WriteLine("first task finished");
});
});
while (task1.Status == TaskStatus.WaitingForActivation)
{
Console.WriteLine("waiting task to be picked by a scheduler. Status = {0}", task1.Status);
await Task.Delay(100);
}
var task2 = Task.Run(() =>
{
return WrapMessageInQueue("10", async () =>
{
Console.WriteLine("second task finished");
});
});
await Task.WhenAll(new[] {task1, task2});
}).Wait();
}
Основная идея здесь заключается в том, что первое ЗАПУСКНОЕ задание должно быть завершено до начала всех остальных.Поэтому я реализовал словарь, в котором храню задачу, и каждая последующая добавляется в цепочку ContinueWith
.Таким образом, он выполняется строго после выполнения предыдущих действий.Когда приходит третье задание, оно получает свое место в очереди и т. Д.
Но по какой-то причине оно не работает, и вывод
завершение второго задания
первое задание выполнено
Что не так с этим кодом?Есть ли лучший подход?