Реализация многопоточного потребителя по идентификатору сегмента сообщения - PullRequest
1 голос
/ 27 сентября 2019

У меня есть командный процессор, который я переписываю для нескольких потоков.Я иду от однопоточного последовательного отправителя.Где у нас есть делегат func, переданный в метод, который затем решает, как вызвать делегат:

internal class SingleThreadedSendStrategy
{
    public Task<TResponse> Send<TResponse>(Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, IRequest<TResponse> request, CancellationToken cancellationToken)
    {
        return func(request, cancellationToken);
    }
}

Для многопоточной (n потоков) настройки, где у каждого потока есть очередь, а командыдобавляется в определенную очередь в зависимости от хэш-кода сообщения, как показано ниже:

internal class ShardedSendStrategy
{
    const int QueueCount = 8;
    readonly BlockingCollection<Task>[] Queues = new BlockingCollection<Task>[QueueCount];
    readonly CancellationToken CancellationToken;

    public ShardedSendStrategy(CancellationToken cancellationToken)
    {
        CancellationToken = cancellationToken;

        for (int i = 0; i < QueueCount; i++)
        {
            Queues[i] = new BlockingCollection<Task>();
            var thread = new Thread( () => OnHandlerStart(Queues[i])) { IsBackground = true };
            thread.Start();
        }
    }

    public Task<TResponse> Send<TResponse>(Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, IRequest<TResponse> request, CancellationToken cancellationToken = default)
    {
        var shard = request.GetHashCode() % QueueCount;

        var task = new Task(() => func(request, cancellationToken));
        Queues[shard].Add(task);

        return task; // we have a problem here, Task<TResponse> is expected
    }

    private void OnHandlerStart(BlockingCollection<Task> queue)
    {
        foreach (var job in queue.GetConsumingEnumerable(CancellationToken))
        {
            job.Start();
        }
    }
}

Однако, если в функции Send я отправил задачу на var task = new Task(() => func(request, cancellationToken));, я не могу вернуть задачу, так как компилятор выдает Cannot implicitly convert type 'System.Threading.Tasks.Task' to 'System.Threading.Tasks.Task<TResponse>'..

Если в функции Send я установил var task = new Task<TResponse>( () => { return func(request, cancellationToken); });, то выдается Cannot convert lambda expression to intended delegate type because some of the return types in the block are not implicitly convertible to the delegate return type.

Как я могу разрешить ошибки приведения, пожалуйста, чтобы я мог снять работу с работы ипобежал в приемнике (я не беспокоюсь о каких-либо конкретных типах объектов, так как мы установили функцию для вызова в конструкторе Task)?

Как также я бы обратился к вызывающему потоку, который вызвал Send, чтофункция завершена в методе OnHandlerStart?

Любые другие лучшие практики, которые я должен реализовать для этой настройки многозадачного потребителя, и, если я заново изобретаю колесо, какой-то код на tего вариант использования с использованием существующей структуры TPL будет высоко оценен?

Ответы [ 2 ]

2 голосов
/ 27 сентября 2019

Конструктор Task не разворачивает задачу в делегате.Он вообще не знает об этом и не заботится об этом.Это одна из вещей, которую вы получаете за использование «расширенного сценария» - не ожидайте, что new Task работает так же, как Task.Run, только без запуска делегата.Это не так.

Более типичным способом реализации того, что вы пытаетесь сделать, было бы создание собственного планировщика задач.Но если вы хотите использовать свой подход, вам нужно выполнить развертывание вручную.Это может быть немного сложнее, чем кажется, но, к счастью, есть метод, который делает именно то, что вам нужно:

var task = new Task<Task<TResponse>>(() => func(request, cancellationToken)).Unwrap();

Менее к счастью, это все еще задача в стиле обещания, и ее нельзя запустить.Так что вам нужно пойти еще дальше:

internal class ShardedSendStrategy
{
  const int QueueCount = 8;
  readonly BlockingCollection<Action>[] Queues 
    = new BlockingCollection<Action>[QueueCount];
  readonly CancellationToken CancellationToken;

  public ShardedSendStrategy(CancellationToken cancellationToken)
  {
    CancellationToken = cancellationToken;

    for (int i = 0; i < QueueCount; i++)
    {
      var id = i;
      Queues[id] = new BlockingCollection<Action>();
      var thread = new Thread( () => OnHandlerStart(Queues[id])) 
                             { IsBackground = true };
      thread.Start();
    }
  }

  public Task<TResponse> Send<TResponse>(
    Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, 
    IRequest<TResponse> request, CancellationToken cancellationToken = default)
  {
    var shard = request.GetHashCode() % QueueCount;
    var tcs = new TaskCompletionSource<TResponse>();

    Queues[shard].Add(() => {
      var result = func(request, cancellationToken).Result;

      tcs.SetResult(result);
    });

    return tcs.Task;
  }

  private void OnHandlerStart(BlockingCollection<Action> queue)
  {
    foreach (var job in queue.GetConsumingEnumerable(CancellationToken))
    {
      job();
    }
  }
}

Основная проблема заключается в том, что это ожидает результата задачи синхронно в фоновом потоке.Это может быть проблемой, если есть зависимости между вашими задачами - вы можете получить тупик, когда одна задача ожидает выполнения другой задачи, но она не может выполнить свою работу, потому что она ожидает места в вашей очередиMsgstr ".

Если вы решили использовать подобный подход, обязательно добавьте правильную обработку ошибок.Вам нужно обрабатывать исключения, отмены и т. Д.

Опять же, реализация собственного планировщика задач, вероятно, является лучшей идеей.

1 голос
/ 27 сентября 2019

Вот как вы можете использовать класс ActionBlock из библиотеки TPL Dataflow для обработки запросов и получения задачи для каждого запроса.В основном вам просто нужно передать TaskCompletionSource<TResponse> вместе с каждым запросом.Соединение их вместе с помощью ValueTuple удобно:

public class ActionTaskBlock<TRequest, TResponse>
{
    private readonly ActionBlock<(TRequest,
        TaskCompletionSource<TResponse>)> _actionBlock;

    /// <summary>Initializes a new instance of the
    /// <see cref="ActionTaskBlock{TRequest,TResponse}"/> class with the
    /// specified process delegate, cancellation token, and max degree of
    /// parallelism.</summary>
    public ActionTaskBlock(Func<TRequest, CancellationToken, TResponse> process,
        CancellationToken cancellationToken, int maxDegreeOfParallelism)
    {
        _actionBlock = new ActionBlock<
            (TRequest Request, TaskCompletionSource<TResponse> TCS)>(entry =>
        {
            try
            {
                var response = process(entry.Request, cancellationToken);
                entry.TCS.SetResult(response);
            }
            catch (OperationCanceledException)
            {
                entry.TCS.TrySetCanceled();
            }
            catch (Exception ex)
            {
                entry.TCS.TrySetException(ex);
            }
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
        });
    }

    /// <summary>Signals to the block that it shouldn't accept any more
    /// requests.</summary>
    public void Complete() => _actionBlock.Complete();

    /// <summary>Gets a <see cref="Task"/> object that represents the
    /// asynchronous operation and completion of the block.</summary>
    public Task Completion => _actionBlock.Completion;

    /// <summary>Schedules a <typeparamref name="TRequest"/> for processing
    /// by the block.</summary>
    public async Task<TResponse> ProcessAsync(TRequest request)
    {
        var tsc = new TaskCompletionSource<TResponse>(
            TaskCreationOptions.RunContinuationsAsynchronously);
        await _actionBlock.SendAsync((request, tsc)).ConfigureAwait(false);
        return await tsc.Task.ConfigureAwait(false);
    }
}

Пример использования:

// Create the block
var cts = new CancellationTokenSource(2000); // Cancel after 2000 msec
var block = new ActionTaskBlock<int, int>((item, token) =>
{
    Console.WriteLine($"Start processing {item}");
    Task.WhenAny(Task.Delay(1000, token)).Wait(); // Sleep safely for 1000 msec
    token.ThrowIfCancellationRequested();
    return item * 2;
}, cts.Token, maxDegreeOfParallelism: 2); // Process no more than 2 at a time

// Feed the block with one request every 300 msec
foreach (var i in Enumerable.Range(1, 10))
{
    Console.WriteLine($"Scheduling {i}");
    block.ProcessAsync(i).ContinueWith(t =>
    {
        Console.WriteLine($"Item {i} processed with status {t.Status}");
    });
    Thread.Sleep(300);
    if (cts.IsCancellationRequested) break;
}
block.Complete();

// Wait for the completion of all requests, or the cancellation of the token
Task.WhenAny(block.Completion).Wait(); // Safe waiting (doesn't throw)
Console.WriteLine($"The block finished with status {block.Completion.Status}");

Вывод:

Планирование 1
Запуск обработки1
Планирование 2
Начало обработки 2
Планирование 3
Планирование 4
Элемент 1 обработан со статусом RanToCompletion
Начать обработку 3
Планирование 5
Элемент 2 обработан со статусомRanToCompletion
Начало обработки 4
Планирование 6
Планирование 7
Элемент 4 обработан со статусом Отменено
Элемент 3 обработан со статусом Отменено
Блок завершен со статусом Отменено

Конструктор класса ActionTaskBlock принимает только синхронные делегаты.Было бы довольно легко реализовать перегрузку, которая принимает в качестве аргумента асинхронный делегат.Все блоки библиотеки TPL Dataflow готовы к асинхронной обработке, что означает, что они также принимают асинхронные делегаты: async entry => { ...= await process(...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...