Как остановить TransformBlock от обработки оставшихся сообщений в очереди на основе условия? - PullRequest
0 голосов
/ 01 октября 2018

Ниже приведен код для простого рабочего процесса с использованием TPL DataFlow в консольном проекте.

Три объекта Test, TestA, TestB и TestC размещены в начальном bufferBlock.Это связано с TransformBlock, который оценивает метод PerformTestAsync() каждого теста, который возвращает Task<TestResult>.TransformBlock связан с ActionBlock, который записывает результаты теста в консоль.

Все это работает нормально.Однако я пытаюсь изменить код так, чтобы ПЕРВЫЙ раз await t.PerformTestAsync() возвращал TestResult.Failed Я хочу, чтобы TransformBlock НЕ обрабатывал больше сообщений и, конечно, больше не передавался в ActionBlock, за исключением неудачного результата.Итак, для моего примера кода я хотел бы видеть только «OK» и «Failed» в окне консоли, и чтобы testC.PerformTestAsync() никогда не вызывался вообще.

Как я мог бы быть в состояниичтобы достичь этого?

Код:

class Program
{
    static void Main(string[] args)
    {

        // Create workflow blocks
        var bufferBlock = new BufferBlock<TestBase>();
        var transformBlock = new TransformBlock<TestBase, TestResult>(async t => await t.PerformTestAsync());
        var actionBlock = new ActionBlock<TestResult>(i => Console.WriteLine(i));
        // Link Blocks
        bufferBlock.LinkTo(transformBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        transformBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });

        // Create Tests
        var tests = new List<TestBase>() { new TestA(), new TestB(), new TestC() };

        // Post them into start of workflow
        foreach (var test in tests)
        {
            bufferBlock.Post<TestBase>(test);
        }
        bufferBlock.Complete();

        actionBlock.Completion.Wait();
        Console.ReadLine();
    }

}


public enum TestResult
{
    OK,
    Error,
    Failed
}

public abstract class TestBase
{
    private readonly string _name;
    public TestBase(string name)
    {
        _name = name;
    }
    public abstract Task<TestResult> PerformTestAsync();

}


public class TestA : TestBase
{
    public TestA() : base("Test A")
    {
    }

    public override Task<TestResult> PerformTestAsync()
    {
        // Do some processing for this test...
        return Task.FromResult(TestResult.OK);
    }
}

public class TestB : TestBase
{
    public TestB() : base("Test B")
    {
    }

    public override Task<TestResult> PerformTestAsync()
    {
        // Do some processing for this test...
        return Task.FromResult(TestResult.Failed);
    }
}

public class TestC : TestBase
{
    public TestC() : base("Test C")
    {
    }

    public override Task<TestResult> PerformTestAsync()
    {
        // Do some processing for this test...
        return Task.FromResult(TestResult.OK);
    }
}

1 Ответ

0 голосов
/ 01 октября 2018

Когда await t.PerformTestAsync() возвращает TestReuslt.Failed, выдается исключение.Это приведет к повреждению потока и предотвратит дальнейшую обработку.Затем поток завершится в неисправном состоянии.Дальнейшие элементы обрабатываться не будут.

var transformBlock = new TransformBlock<TestBase, TestResult>(async t =>
{
    var result = await t.PerformTestAsync();
    if (result == TestResult.Failed)
        throw new InvalidOperationException();
    return result;
});

Обратите внимание, что выбрасываемое вами исключение будет распространено на задачу Completion последнего блока, т. Е. На ActionBlock.Когда вы дождетесь этой задачи, вы сможете обрабатывать сбойный поток или игнорировать его по своему усмотрению.

...