Ручное управление порядком, в котором выполняются задачи - PullRequest
0 голосов
/ 25 мая 2018

У меня есть очередь RabbitMQ, которую я читаю асинхронно в пакетном режиме, но я должен сохранить порядок этих сообщений.У меня есть поле с именем ServiceNumber, которое определяет уникальный номер сообщения, и этот порядок я должен соблюдать.

Например

   SN1 SN2 SN1 SN1 SN1 SN2
   1   2   3   4   5   6 

В этом случае мы можем обрабатывать сообщения 1и 2 одновременно (они имеют разные SN), затем мы можем обработать 3 и 6, затем 4, затем 5.

Я попытался реализовать это через цепочку ContinueWith следующим образом:

private readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

private async Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)
{
    Task taskToAwait;
    await _semaphore.WaitAsync();
    try
    {
        _currentTasks.TryGetValue(serviceNumber, out var task);
        if (task == null)
            task = Task.CompletedTask;

        taskToAwait = _currentTasks[serviceNumber] = task.ContinueWith(_ => taskFunc());
    }
    finally
    {
        _semaphore.Release();
    }

    await taskToAwait.ConfigureAwait(false);
}

void Main()
{
    Task.Run(async () => {
        var task1 = Task.Run(() =>
        {
            return WrapMessageInQueue("10", async () =>
            {
                await Task.Delay(5000);
                Console.WriteLine("first task finished");
            });
        });

        while (task1.Status == TaskStatus.WaitingForActivation) 
        {
            Console.WriteLine("waiting task to be picked by a scheduler. Status = {0}", task1.Status);
            await Task.Delay(100);
        }

        var task2 = Task.Run(() =>
        {
            return WrapMessageInQueue("10", async () =>
            {
                Console.WriteLine("second task finished");
            });
        });

        await Task.WhenAll(new[] {task1, task2});
    }).Wait();
}

Основная идея здесь заключается в том, что первое ЗАПУСКНОЕ задание должно быть завершено до начала всех остальных.Поэтому я реализовал словарь, в котором храню задачу, и каждая последующая добавляется в цепочку ContinueWith.Таким образом, он выполняется строго после выполнения предыдущих действий.Когда приходит третье задание, оно получает свое место в очереди и т. Д.

Но по какой-то причине оно не работает, и вывод

завершение второго задания

первое задание выполнено

Что не так с этим кодом?Есть ли лучший подход?

Ответы [ 3 ]

0 голосов
/ 25 мая 2018

Проблема в том, что этот фрагмент кода

task.ContinueWith(_ => taskFunc());

не выполняет то, что вы ожидаете.Он создает задачу, которая просто запускает продолжение, но не ждет ее выполнения.В результате оба ваших продолжения вызываются немедленно.

В целом у вас слишком много ненужных задач, которые частично не ожидаются должным образом.Я убрал его и реализовал функцию продолжения, которая необходима для его работы.

public static
class TaskExtensions
{
    public static async
    Task ContinueWith(this Task task, Func<Task> continuation)
    {
        await task;
        await continuation();
    }
}

class Program
{
    static readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();

    private static
    Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)
    {
        lock (_currentTasks)
        {
            if (!_currentTasks.TryGetValue(serviceNumber, out var task))
                task = Task.CompletedTask;

            return _currentTasks[serviceNumber] = task.ContinueWith(() => taskFunc());
        }
    }

    public static
    void Main(string[] args)
    {
        Task.Run(async () =>
        {
            var task1 = WrapMessageInQueue("10", async () =>
            {
                await Task.Delay(500);
                Console.WriteLine("first task finished");
            });

            var task2 = WrapMessageInQueue("10", async () =>
            {
                Console.WriteLine("second task finished");
            });

            await Task.WhenAll(new[] { task1, task2 });
        }).Wait();
    }
}
0 голосов
/ 26 мая 2018

Интересный подход.Но поскольку обработка сообщений (по SN) в любом случае должна быть последовательной, зачем вам одна задача на сообщение?Это только усложняет задачу, потому что вам нужно контролировать порядок выполнения задач.

Почему бы не иметь задачу сборщика, которая сортирует входящие сообщения в очереди (по SN) и запускает одну задачу на каждый SN для обработкиочередь?

0 голосов
/ 25 мая 2018

Вы используете Task.Run, чтобы добавить свои тестовые задачи в очередь и получить условие гонки здесь - нет никакой гарантии, что task1 будет выбран пулом потоков ранее, чем task2.

* 1005.* Не уверен, что это действительно так.

Возможно, вы захотите проверить библиотеку TPL Dataflow , я предполагаю, что она вполне подходит для описанного сценария.

Или даже группапо SN с использованием Reactive расширений, а затем обработать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...