У меня есть командный процессор, который я переписываю для нескольких потоков.Я иду от однопоточного последовательного отправителя.Где у нас есть делегат func
, переданный в метод, который затем решает, как вызвать делегат:
internal class SingleThreadedSendStrategy
{
public Task<TResponse> Send<TResponse>(Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, IRequest<TResponse> request, CancellationToken cancellationToken)
{
return func(request, cancellationToken);
}
}
Для многопоточной (n потоков) настройки, где у каждого потока есть очередь, а командыдобавляется в определенную очередь в зависимости от хэш-кода сообщения, как показано ниже:
internal class ShardedSendStrategy
{
const int QueueCount = 8;
readonly BlockingCollection<Task>[] Queues = new BlockingCollection<Task>[QueueCount];
readonly CancellationToken CancellationToken;
public ShardedSendStrategy(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
for (int i = 0; i < QueueCount; i++)
{
Queues[i] = new BlockingCollection<Task>();
var thread = new Thread( () => OnHandlerStart(Queues[i])) { IsBackground = true };
thread.Start();
}
}
public Task<TResponse> Send<TResponse>(Func<IRequest<TResponse>, CancellationToken, Task<TResponse>> func, IRequest<TResponse> request, CancellationToken cancellationToken = default)
{
var shard = request.GetHashCode() % QueueCount;
var task = new Task(() => func(request, cancellationToken));
Queues[shard].Add(task);
return task; // we have a problem here, Task<TResponse> is expected
}
private void OnHandlerStart(BlockingCollection<Task> queue)
{
foreach (var job in queue.GetConsumingEnumerable(CancellationToken))
{
job.Start();
}
}
}
Однако, если в функции Send
я отправил задачу на var task = new Task(() => func(request, cancellationToken));
, я не могу вернуть задачу, так как компилятор выдает Cannot implicitly convert type 'System.Threading.Tasks.Task' to 'System.Threading.Tasks.Task<TResponse>'.
.
Если в функции Send
я установил var task = new Task<TResponse>( () => { return func(request, cancellationToken); });
, то выдается Cannot convert lambda expression to intended delegate type because some of the return types in the block are not implicitly convertible to the delegate return type
.
Как я могу разрешить ошибки приведения, пожалуйста, чтобы я мог снять работу с работы ипобежал в приемнике (я не беспокоюсь о каких-либо конкретных типах объектов, так как мы установили функцию для вызова в конструкторе Task)?
Как также я бы обратился к вызывающему потоку, который вызвал Send, чтофункция завершена в методе OnHandlerStart
?
Любые другие лучшие практики, которые я должен реализовать для этой настройки многозадачного потребителя, и, если я заново изобретаю колесо, какой-то код на tего вариант использования с использованием существующей структуры TPL будет высоко оценен?