ASP. Net Параллельная обработка фоновых задач в очереди Core - PullRequest
1 голос
/ 13 июля 2020

У меня есть базовый веб-API ASP. NET, который использует фоновые задачи в очереди, как описано здесь .

Я использовал предоставленный образец кода и добавил IBackgroundTaskQueue, BackgroundTaskQueue и QueuedHostedService точно так, как описано в статье.

В моем Startup.cs я регистрирую только один экземпляр QueuedHostedService следующим образом: services.AddHostedService<QueuedHostedService>();

Задачи, поступающие от контроллера WebApi, ставятся в очередь, а затем удаляются и выполняются одна за другой QueuedHostedService.

Я хотел бы разрешить более одного потока фоновой обработки, который будет исключать из очереди и выполнять входящие задачи. Самое простое решение, которое я могу придумать, - это зарегистрировать более одного экземпляра QueuedHostedService в моем Startup.cs. например, что-то вроде этого:

 int maxNumOfParallelOperations;
 var isValid = int.TryParse(Configuration["App:MaxNumOfParallelOperations"], out maxNumOfParallelOperations);

 maxNumOfParallelOperations = isValid && maxNumOfParallelOperations > 0 ? maxNumOfParallelOperations : 2;

 for (int index = 0; index < maxNumOfParallelOperations; index++) 
 {
    services.AddHostedService<QueuedHostedService>();
 }

Я также заметил, что благодаря единственному семафору в BackgroundTaskQueue экземпляры QueuedHostedService на самом деле не работают все время, а только просыпаются при новом Задача доступна в очереди.

Это решение, кажется, отлично работает в моих тестах.

Но, в этом конкретном случае использования - действительно ли это действительное рекомендуемое решение для параллельной обработки?

1 Ответ

1 голос
/ 13 июля 2020

Вы можете использовать IHostedService с несколькими потоками для использования IBackgroundTaskQueue.

Вот базовая реализация c. Я предполагаю, что вы используете те же IBackgroundTaskQueue и BackgroundTaskQueue, описанные здесь .

public class QueuedHostedService : IHostedService
{
    private readonly ILogger _logger;

    private readonly Task[] _executors;
    private readonly int _executorsCount = 2; //--default value: 2
    private CancellationTokenSource _tokenSource;
    public IBackgroundTaskQueue TaskQueue { get; }

    public QueuedHostedService(IBackgroundTaskQueue taskQueue,
        ILoggerFactory loggerFactory,
        IConfiguration configuration)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();

        if (ushort.TryParse(configuration["App:MaxNumOfParallelOperations"], out var ct))
        {
            _executorsCount = ct;
        }
        _executors = new Task[_executorsCount];
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        for (var i = 0; i < _executorsCount; i++)
        {
            var executorTask = new Task(
                async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
#if DEBUG
                    _logger.LogInformation("Waiting background task...");
#endif
                    var workItem = await TaskQueue.DequeueAsync(cancellationToken);

                        try
                        {
#if DEBUG
                        _logger.LogInformation("Got background task, executing...");
#endif
                        await workItem(cancellationToken);
                        }
                        catch (Exception ex)
                        {
                            _logger.LogError(ex,
                                "Error occurred executing {WorkItem}.", nameof(workItem)
                            );
                        }
                    }
                }, _tokenSource.Token);

            _executors[i] = executorTask;
            executorTask.Start();
        }

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is stopping.");
        _tokenSource.Cancel(); // send the cancellation signal

        if (_executors != null)
        {
            // wait for _executors completion
            Task.WaitAll(_executors, cancellationToken);
        }

        return Task.CompletedTask;
    }
}

Вам необходимо зарегистрировать службы в ConfigureServices в классе Startup.

...
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddHostedService<QueuedHostedService>();
...

Дополнительно вы можете установить количество потоков в конфигурации (appsettings.json)

...
"App": {
    "MaxNumOfParallelOperations": 4
}
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...