Время распространения запроса на отмену ко всем задачам (TPL) - PullRequest
0 голосов
/ 07 октября 2018

С TPL у нас есть CancellationTokenSource, который предоставляет токены, полезные для совместной отмены текущего задания (или его запуска).

Вопрос:

Как долго этопринять, чтобы распространить запрос отмены на все подключенные запущенные задачи?Есть ли место, где код мог бы посмотреть, чтобы проверить, что «отныне» каждый заинтересованный Task найдет, что запрос на отмену был запрошен?


Зачем это нужно?

Я хотел бы иметь стабильный модульный тест, чтобы показать, что в нашем коде работает отмена.

Детали проблемы:

У нас есть "Executor", который создает задачи, эти задачи переносятся довольно долгобегущие действия.Основная задача исполнителя - ограничить количество одновременных действий.Все эти задачи могут быть отменены по отдельности, а также эти действия будут учитывать CancellationToken внутренне.

Я хотел бы предоставить модульный тест, который показывает, что когда отмена произошла, в то время как задача ожидает slot для запуска заданного действия , эта задача отменит себя (в конце концов) и не начнет выполнение заданного действия .

Итак, идея заключалась в подготовке LimitingExecutor с одним слотом .Затем запустите блокирующее действие , которое будет запрашивать отмену при разблокировании.Затем «поставьте в очередь» тестовое действие , которое должно завершиться неудачно при выполнении.При такой настройке тесты будут вызывать разблокировать , а затем утверждать, что задача тестовое действие выдаст TaskCanceledException при ожидании.

[Test]
public void RequestPropagationTest()
{
    using (var setupEvent = new ManualResetEvent(initialState: false))
    using (var cancellation = new CancellationTokenSource())
    using (var executor = new LimitingExecutor())
    {
        // System-state setup action:
        var cancellingTask = executor.Do(() =>
        {
            setupEvent.WaitOne();
            cancellation.Cancel();
        }, CancellationToken.None);

        // Main work action:
        var actionTask = executor.Do(() =>
        {
            throw new InvalidOperationException(
                "This action should be cancelled!");
        }, cancellation.Token);

        // Let's wait until this `Task` starts, so it will got opportunity
        // to cancel itself, and expected later exception will not come
        // from just starting that action by `Task.Run` with token:
        while (actionTask.Status < TaskStatus.Running)
            Thread.Sleep(millisecondsTimeout: 1);

        // Let's unblock slot in Executor for the 'main work action'
        // by finalizing the 'system-state setup action' which will
        // finally request "global" cancellation:
        setupEvent.Set();

        Assert.DoesNotThrowAsync(
            async () => await cancellingTask);

        Assert.ThrowsAsync<TaskCanceledException>(
            async () => await actionTask);
    }
}

public class LimitingExecutor : IDisposable
{
    private const int UpperLimit = 1;
    private readonly Semaphore _semaphore
        = new Semaphore(UpperLimit, UpperLimit);

    public Task Do(Action work, CancellationToken token)
        => Task.Run(() =>
        {
            _semaphore.WaitOne();
            try
            {
                token.ThrowIfCancellationRequested();
                work();
            }
            finally
            {
                _semaphore.Release();
            }
        }, token);

    public void Dispose()
        => _semaphore.Dispose();
}

Исполняемая демонстрация (через NUnit) этой проблемы можно найти в GitHub .

Однако реализация этого теста иногда дает сбой (не ожидается TaskCanceledException), на моем компьютере может быть 1 из 10 запусков.Вид "решения" этой проблемы - вставить Thread.Sleep сразу после запроса на отмену.Даже при 3-секундном сне этот тест иногда дает сбой (обнаруживается после 20-ти прогонов), и когда он проходит, такое долгое ожидание обычно не требуется (я полагаю).Для справки см. diff .

"Другая проблема", чтобы гарантировать, что отмена происходит из "времени ожидания", а не из Task.Run, потому что ThreadPool может быть занят (другие выполняющие тесты), и это откладывает запуск второго задания после запроса отмены - что сделало бы этот тест «ложно-зеленым».«Легкое исправление взломом» состояло в том, чтобы активно ждать, пока не запустится вторая задача - ее Status становится TaskStatus.Running.Пожалуйста, проверьте версию под этой веткой и убедитесь, что тест без этого взлома будет иногда "зеленым" - поэтому пробная ошибка может пройти через него.

1 Ответ

0 голосов
/ 07 октября 2018

Ваш тестовый метод предполагает, что cancellingTask всегда занимает слот (входит в семафор) в LimitingExecutor перед actionTask.К сожалению, это предположение неверно, LimitingExecutor не гарантирует этого, и это просто вопрос удачи, какая из двух задач занимает слот (на самом деле на моем компьютере это происходит только в 5% случаев).

Чтобы решить эту проблему, вам нужен еще один ManualResetEvent, который позволит основному потоку ждать, пока cancellingTask фактически не займет слот:

using (var slotTaken = new ManualResetEvent(initialState: false))
using (var setupEvent = new ManualResetEvent(initialState: false))
using (var cancellation = new CancellationTokenSource())
using (var executor = new LimitingExecutor())
{
    // System-state setup action:
    var cancellingTask = executor.Do(() =>
    {
        // This is called from inside the semaphore, so it's
        // certain that this task occupies the only available slot.
        slotTaken.Set();

        setupEvent.WaitOne();
        cancellation.Cancel();
    }, CancellationToken.None);

    // Wait until cancellingTask takes the slot
    slotTaken.WaitOne();

    // Now it's guaranteed that cancellingTask takes the slot, not the actionTask

    // ...
}

.NET Framework не предоставляет API для обнаружения перехода задачи в состояние Running, поэтому, если вам не нравится опрос свойства State + Thread.Sleep() вцикл, вам нужно изменить LimitingExecutor.Do(), чтобы предоставить эту информацию, возможно, с использованием другого ManualResetEvent, например:

public Task Do(Action work, CancellationToken token, ManualResetEvent taskRunEvent = null)
    => Task.Run(() =>
    {
        // Optional notification to the caller that task is now running
        taskRunEvent?.Set();

        // ...
    }, token);
...