TPL DataFlow правильный способ обработки исключений - PullRequest
0 голосов
/ 28 октября 2019

У меня проблема в службе Windows, которая использует TPL DataFlow для управления очередью (базой данных) и перенаправляет работу в службу сетевых вычислений. И в какой-то момент BufferBlock прекращает выпускать задачи, и я не уверен, почему. Я думаю, это потому, что некоторые исключения происходят во время выполнения некоторых задач, но они подавляются, и трудно понять, в какой момент BufferBlock перестает принимать новые задачи.

Я попытался упростить это в рабочем примере ниже. У него нет обработки исключений, и мне интересно, как правильно обрабатывать исключения в TPL. Я нашел нечто подобное здесь Поток данных TPL, гарантированное завершение только тогда, когда ВСЕ исходные блоки данных завершены В этом примере у меня есть 100 запросов, и я обрабатываю данные группами по 10 запросов. Эмуляция некоторого исключения, которое происходит, если ID% 9 == 0. Если я не уловил это исключение, оно работает немного, а затем перестает принимать новые запросы. Если я обработаю и верну Result.Failure, он работает нормально, я верю, но я не уверен, что это правильный способ использовать его в производственной среде.

Я новичок в TPL, забудь меня, если я неОбъясните мой вопрос более четко. Проект GitHub

Изображение Пустые слоты

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Timers;
using CSharpFunctionalExtensions;

namespace TestTPL
{
    public class ServicePipeline
    {
        public const int batches = 100;
        private int currentBatch = 0;

        public ServicePipeline(int maxRequestsInParallel)
        {
            MaxRequestsInParallel = maxRequestsInParallel;
        }

        public int MaxRequestsInParallel { get; }
        public BufferBlock<MyData> QueueBlock { get; private set; }
        public List<TransformBlock<MyData, Result>> ExecutionBlocks
            { get; private set; }
        public ActionBlock<Result> ResultBlock { get; private set; }

        private void Init()
        {
            QueueBlock = new BufferBlock<MyData>(new DataflowBlockOptions()
                { BoundedCapacity = MaxRequestsInParallel });
            ExecutionBlocks = new List<TransformBlock<MyData, Result>>();
            ResultBlock = new ActionBlock<Result>(_ => _.OnFailure(
                () => Console.WriteLine($"Error: {_.Error}")));

            for (int blockIndex = 0; blockIndex < MaxRequestsInParallel;
                blockIndex++)
            {
                var executionBlock = new TransformBlock<MyData, Result>((d) =>
                {
                    return ExecuteAsync(d);
                }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
                executionBlock.LinkTo(ResultBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                QueueBlock.LinkTo(executionBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                ExecutionBlocks.Add(executionBlock);
            }
        }

        public static Result ExecuteAsync(MyData myData)
        {
            //try
            //{
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => web.DownloadStringAsync(
                new Uri("http://localhost:49182/Slow.ashx")));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
            //}
            //catch (Exception ex)
            //{
            //    return Result.Failure($"Exception: {ex.Message}");
            //}
        }

        public async void Start()
        {
            Init();
            while (currentBatch < batches)
            {
                Thread.Sleep(1000);
                await SubmitNextRequests();
            }
            Console.WriteLine($"Completed: {batches}");
        }

        private async Task<int> SubmitNextRequests()
        {
            var emptySlots = MaxRequestsInParallel - QueueBlock.Count;
            Console.WriteLine($"Empty slots: {emptySlots}" +
                $", left = {batches - currentBatch}");
            if (emptySlots > 0)
            {
                var dataRequests = await GetNextRequests(emptySlots);
                foreach (var data in dataRequests)
                {
                    await QueueBlock.SendAsync(data);
                }
            }
            return emptySlots;
        }

        private async Task<List<MyData>> GetNextRequests(int request)
        {
            MyData[] myDatas = new MyData[request];
            Task<List<MyData>> task = Task<List<MyData>>.Run(() =>
            {
                for (int i = 0; i < request; i++)
                {
                    myDatas[i++] = new MyData(currentBatch);
                    currentBatch++;
                }
                return new List<MyData>(myDatas);
            });
            return await task;
        }
    }

    public class MyData
    {
        public int Id { get; set; }
        public MyData(int id) => Id = id;
        public override string ToString() { return Id.ToString(); }
    }
}

РЕДАКТИРОВАТЬ: 10/30/2019 Работает, как ожидается, когдаисключение обрабатывается и вызывается явно Result.Failure ($ "Exception: {ex.Message}");

    public static Result ExecuteAsync(MyData myData)
    {
        try
        {
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => Thread.Sleep(2000));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
        }
        catch (Exception ex)
        {
            return Result.Failure($"Exception: {ex.Message}");
        }
    }

1 Ответ

0 голосов
/ 29 октября 2019

При связывании двух блоков есть возможность распространять завершение вперед, но не назад. Это становится проблемой, когда используется опция BoundedCapacity и возникает ошибка, поскольку она может заблокировать фидер конвейера и вызвать взаимоблокировку. Это довольно легко распространять завершение вручную, хотя. Вот метод, который вы можете использовать.

async void OnErrorComplete(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted) block2.Complete();
}

Он асинхронно ожидает завершения block1, а в случае неудачи немедленно завершает block2. Завершение восходящего блока обычно достаточно, но вы также можете распространить конкретное исключение, если хотите:

async void OnErrorPropagate(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted)
        block2.Fault(block1.Completion.Exception.InnerException);
}
...