Как я могу отложить мою задачу до ее возвращения? - PullRequest
0 голосов
/ 11 марта 2020

Мое утверждение acceptor.IsStarted.Should().BeTrue(); (см. Модульный тест ниже) всегда терпит неудачу, так как оно оценивается слишком рано. Вызов await task немедленно возвращается и не дает this.acceptor.Start() достаточно времени для ускорения.

Я бы хотел запустить мой FixAcceptor() more terministi c и для этого ввел параметр TimeSpan startupDelay.

Однако я просто понятия не имею, где и как я могу отложить запуск.

Помещение дополнительных Thread.Sleep(startupDelay) между this.acceptor.Start() и this.IsStarted = true не поможет, поскольку блокирует только саму рабочую задачу, но не вызывающий поток.

Надеюсь, понятно, что я хотел бы заархивировать и с чем борюсь. Заранее спасибо.

public class FixAcceptor
{
    // Type provided by QuickFix.net
    private readonly ThreadedSocketAcceptor acceptor;

    public FixAcceptor(IFixSettings settings)
    {
        // Shortened
    }

    public bool IsStarted { get; private set; }

    public async void Run(CancellationToken cancellationToken, TimeSpan startupDelay)
    {
        var task = Task.Run(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();

            this.acceptor.Start();
            this.IsStarted = true;

            while (true)
            {
                // Stop if token has been canceled
                if (cancellationToken.IsCancellationRequested)
                {
                    this.acceptor.Stop();
                    this.IsStarted = false;

                    cancellationToken.ThrowIfCancellationRequested();
                }

                // Save some CPU cycles
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }

        }, cancellationToken);

        try
        {
            await task;
        }
        catch (OperationCanceledException e)
        {
            Debug.WriteLine(e.Message);
        }
    }
}

и соответствующий код потребителя

[Fact]
public void Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
    // Arrange
    var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
    var tokenSource = new CancellationTokenSource();

    // Act
    tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
    acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));

    // Assert
    acceptor.IsStarted.Should().BeTrue();
    IsListeningOnTcpPort(9823).Should().BeTrue();

    // Wait for cancel event to occur
    Thread.Sleep(TimeSpan.FromSeconds(15));
    acceptor.IsStarted.Should().BeFalse();
}

Ответы [ 2 ]

2 голосов
/ 11 марта 2020

Добавление задержек для достижения детерминизма не рекомендуется. Вы можете достичь 100% детерминизма, используя TaskCompletionSource для контроля завершения задачи в нужный момент:

public Task<bool> Start(CancellationToken cancellationToken)
{
    var startTcs = new TaskCompletionSource<bool>();
    var task = Task.Run(() =>
    {
        cancellationToken.ThrowIfCancellationRequested();

        this.acceptor.Start();
        this.IsStarted = true;
        startTcs.TrySetResult(true); // Signal that the starting phase is completed

        while (true)
        {
            // ...
        }

    }, cancellationToken);
    HandleTaskCompletion();
    return startTcs.Task;

    async void HandleTaskCompletion() // async void method = should never throw
    {
        try
        {
            await task;
        }
        catch (OperationCanceledException ex)
        {
            Debug.WriteLine(ex.Message);
            startTcs.TrySetResult(false); // Signal that start failed
        }
        catch
        {
            startTcs.TrySetResult(false); // Signal that start failed
        }
    }
}

Затем замените эту строку в своем тесте:

acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));

... с этим:

bool startResult = await acceptor.Start(tokenSource.Token);

Другая проблема, которая попалась мне на глаза, это свойство bool IsStarted, которое мутирует из одного потока и наблюдается другим без синхронизации. На самом деле это не проблема, потому что вы можете положиться на недокументированный барьер памяти, который вставляется автоматически на каждый await, и быть уверенным, что у вас не будет проблем с видимостью, но если вы хотите быть уверены, что можете синхронизировать получить доступ с помощью lock (наиболее надежного) или создать резервную копию свойства с помощью частного поля volatile, например:

private volatile bool _isStarted;
public bool IsStarted => _isStarted;
1 голос
/ 11 марта 2020

Я бы порекомендовал вам структурировать ваш FixAcceptor.Run() метод немного по-другому

public async Task Run(CancellationToken cancellationToken, TimeSpan startupDelay)
{
    var task = Task.Run(async () =>
    {
        try 
        {
            cancellationToken.ThrowIfCancellationRequested();

            this.acceptor.Start();
            this.IsStarted = true;

            while (true)
            {
                // Stop if token has been canceled
                if (cancellationToken.IsCancellationRequested)
                {
                    this.acceptor.Stop();
                    this.IsStarted = false;

                    cancellationToken.ThrowIfCancellationRequested();
                }

                // Save some CPU cycles
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }
        catch (OperationCanceledException e)
        {
            Debut.WriteLine(e.Message);
        }
    }, cancellationToken);

    await Task.Delay(startupDelay);
}

, поэтому обработка исключений находится во внутренней задаче, а метод Run возвращает Task, который завершается после startupDelay. (Я также обменял Thread.Sleep() на Task.Delay()). Затем в методе теста вы можете ожидать Task, возвращаемое Run

[Fact]
public async Task Should_Run_Acceptor_And_Stop_By_CancelationToken()
{
    // Arrange
    var acceptor = new FixAcceptor(new FixAcceptorSettings("test_acceptor.cfg", this.logger));
    var tokenSource = new CancellationTokenSource();

    // Act
    tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
    await acceptor.Run(tokenSource.Token, TimeSpan.FromSeconds(3));

    // Assert
    acceptor.IsStarted.Should().BeTrue();
    IsListeningOnTcpPort(9823).Should().BeTrue();

    // Wait for cancel event to occur
    Thread.Sleep(TimeSpan.FromSeconds(15));
    acceptor.IsStarted.Should().BeFalse();
}

Все будет в порядке, чтобы сделать метод async (шов, как будто вы используете xunit)

...