Как повторно поставить в очередь невыполненную фоновую задачу asyn c - PullRequest
1 голос
/ 10 апреля 2020

Добрый день, я пытаюсь создать службу очереди задач общего назначения, которая выполняется в фоновом режиме, используя BackgroundService. Я избегал использования Delegate функции Func<T1,T2, OuT> в качестве входных данных для метода EnqueueTask(Task<ResponseHelper> newTask), потому что я хотел универсальное c решение, поэтому я решил вместо него передать Task<ResponseHelper>. Но это решение не позволяет мне повторно ставить невыполненную задачу внутри ExecuteAsync(CancellationToken stoppingToken), поскольку задача возвращает экземпляр ResponseHelper, который не является копией невыполненной задачи. Пожалуйста, помогите мне исправить код, который мне нужно, чтобы вернуть отложенную задачу из функции DequeueTaskAsync(CancellationToken cancellationToken) вместо экземпляра ResponseHelper.

public interface ITaskQueueHelper
{
    void EnqueueTask(Task<ResponseHelper> newTask);

    Task<ResponseHelper> DequeueTaskAsync(CancellationToken cancellationToken);
}

public class TaskQueueHelper : ITaskQueueHelper
{
    private readonly SemaphoreSlim signal;

    private readonly ConcurrentQueue<Task<ResponseHelper>> taskQueue;

    public TaskQueueHelper()
    {
        signal = new SemaphoreSlim(0);

        taskQueue = new ConcurrentQueue<Task<ResponseHelper>>();
    }

    public void EnqueueTask(Task<ResponseHelper> newTask)
    {
        if (newTask == null)
        {
            throw new ArgumentNullException(nameof(newTask));
        }

        taskQueue.Enqueue(newTask);

        signal.Release();
    }

    public async Task<ResponseHelper> DequeueTaskAsync(CancellationToken cancellationToken)
    {
        await signal.WaitAsync(cancellationToken);

        taskQueue.TryDequeue(out var currentTask);

        /*I need to return currentTask here, instead of an instance of ResponseHelper*/

        return await currentTask;
    }
}

public class TaskQueueService : BackgroundService
{
    private readonly ITaskQueueHelper taskQueue;

    private readonly ILogger<TaskQueueService> logger;

    public TaskQueueService(
        ITaskQueueHelper _taskQueue,
        ILogger<TaskQueueService> _logger)
    {
        logger = _logger;

        taskQueue = _taskQueue;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            ResponseHelper response = await taskQueue.DequeueTaskAsync(stoppingToken);

            try
            {
                if (!response.Status.Equals(ResultCode.Success))
                {
                    /*I need to re-enqueue a failed task here*/

                    //taskQueue.EnqueueTask(currentTask);
                }
            }
            catch (Exception e)
            {
                logger.LogError(e, $"Error occurred executing {nameof(TaskQueueService)}");
            }
        }
    }
}

1 Ответ

0 голосов
/ 10 апреля 2020

для повторной попытки вы можете сделать:


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            ResponseHelper response = await taskQueue.DequeueTaskAsync(stoppingToken);

            try
            {
                if (!response.Status.Equals(ResultCode.Success))
                {
                    // Retry the task.
                    response = await taskQueue.DequeueTaskAsync(stoppingToken);

                }
            }
            catch (Exception e)
            {
                logger.LogError(e, $"Error occurred executing {nameof(TaskQueueService)}");
            }
        }
    }

...