Как охватить MaxDegreeOfParallelism между несколькими блоками TPL DataFlow? - PullRequest
0 голосов
/ 15 января 2019

Я хочу ограничить общее количество запросов, которые я отправляю на сервер базы данных по всем блокам DataFlow, до 30. В следующем сценарии регулирование 30 одновременных задач происходит в каждом блоке, поэтому при выполнении всегда выполняется 60 одновременных задач.Очевидно, что я мог бы ограничить свой параллелизм до 15 на блок, чтобы получить общую системную сумму 30, но это не было бы оптимальным.

Как мне сделать эту работу?Должен ли я ограничивать (и блокировать) свои ожидания с помощью SemaphoreSlim и т. Д., Или есть внутренний подход DataFlow, который работает лучше?

public class TPLTest
{
    private long AsyncCount = 0;
    private long MaxAsyncCount = 0;
    private long TaskId = 0;
    private object MetricsLock = new object();

    public async Task Start()
    {
        ExecutionDataflowBlockOptions execOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
        DataflowLinkOptions linkOption = new DataflowLinkOptions() { PropagateCompletion = true };

        var doFirstIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
        var doCPUWork = new TransformBlock<Data, Data>(data => DoCPUBoundWork(data));
        var doSecondIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
        var doProcess = new TransformBlock<Data, string>(i => $"Task finished, ID = : {i.TaskId}");
        var doPrint = new ActionBlock<string>(s => Debug.WriteLine(s));

        doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
        doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
        doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
        doProcess.LinkTo(doPrint, linkOption);

        int taskCount = 150;
        for (int i = 0; i < taskCount; i++)
        {
            await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
        }
        doFirstIOWorkAsync.Complete();

        await doPrint.Completion;
        Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
    }

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        lock(MetricsLock)
        {
            AsyncCount++;
            if (AsyncCount > MaxAsyncCount)
                MaxAsyncCount = AsyncCount;
        }

        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        await Task.Delay(data.Delay);

        lock (MetricsLock)
            AsyncCount--;

        return data;
    }

    private Data DoCPUBoundWork(Data data)
    {
        data.Step = 1;
        return data;
    }
}

Класс данных:

public class Data
{
    public int Delay { get; set; }
    public long TaskId { get; set; }
    public int Step { get; set; }
}

Начальная точка:

TPLTest tpl = new TPLTest();
await tpl.Start();

Ответы [ 2 ]

0 голосов
/ 17 января 2019

Это решение, к которому я пришел в итоге (если я не могу понять, как использовать один общий блок DataFlow для маршалинга каждого типа доступа к базе данных):

Я определил SemaphoreSlim на уровне класса:

private SemaphoreSlim ThrottleDatabaseQuerySemaphore = new SemaphoreSlim(30, 30);

Я изменил класс ввода-вывода для вызова класса регулирования:

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        Task t = Task.Delay(data.Delay); ;
        await ThrottleDatabaseQueryAsync(t);

        return data;
    }

Класс регулирования: (У меня также есть универсальная версия процедуры регулирования, потому что я не мог понять, как написать одну процедуру для обработки и Задачи, и Задачи )

    private async Task ThrottleDatabaseQueryAsync(Task task)
    {
        await ThrottleDatabaseQuerySemaphore.WaitAsync();
        try
        {
            lock (MetricsLock)
            {
                AsyncCount++;
                if (AsyncCount > MaxAsyncCount)
                    MaxAsyncCount = AsyncCount;
            }

            await task;
        }
        finally
        {
            ThrottleDatabaseQuerySemaphore.Release();

            lock (MetricsLock)
                AsyncCount--;
        }
    }
}
0 голосов
/ 16 января 2019

Почему бы вам не собрать все в блок действий, который имеет действительное ограничение?

var count = 0;
var ab1 = new TransformBlock<int, string>(l => $"1:{l}");
var ab2 = new TransformBlock<int, string>(l => $"2:{l}");
var doPrint = new ActionBlock<string>(
    async s =>
    {
        var c = Interlocked.Increment(ref count);
        Console.WriteLine($"{c}:{s}");
        await Task.Delay(5);
        Interlocked.Decrement(ref count);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 15 });

ab1.LinkTo(doPrint);
ab2.LinkTo(doPrint);

for (var i = 100; i > 0; i--)
{
    if (i % 3 == 0) await ab1.SendAsync(i);
    if (i % 5 == 0) await ab2.SendAsync(i);
}

ab1.Complete();
ab2.Complete();

await ab1.Completion;
await ab2.Completion;
...