Исправлено асинхронное выполнение метода C # - PullRequest
1 голос
/ 04 апреля 2019

Проблема, с которой я сталкиваюсь, - это вызов асинхронного метода, который я выполняю, происходит последовательно.Я добавляю задачи вызова в ConcurrentBag и жду задач в сумке.Мне плевать на результаты этих звонков, мне просто нужно подтверждение того, что они завершены.Однако эти вызовы происходят полностью последовательно, что очень запутанно.Этот метод выполняет несколько запросов PostgreSQL через Npgsql с параметризованными запросами.Вызывающая сторона получает дерево наших собственных данных и извлекает все узлы в дереве, перебирает узлы и выполняет с ними эту задачу.Я также использую пользовательский класс AsyncHelper, который будет перебирать задачи в реализации IEnumerable и ждать задач внутри него.И моя реализация Tree, и AsyncHelper были протестированы в другом фрагменте кода, который выполняет те же основные принципы этого кода, который выполняет задачи асинхронно, как и ожидалось.

Я добавил регистрацию вызовов функций, чтобы подтвердить этипроисходят последовательно.Я также вынимаю метод из пакета и просто запускаю метод, который все еще делает то же самое, это происходит последовательно и не будет продолжать мой цикл, пока не будет сделано.Все мои методы помечены как асинхронные, и я не ожидаю их до окончания цикла.

//method executing sequentially
public static async Task<List<ContactStatistic>> getContactStats(Guid tenantId, DateTime start, DateTime end, Breakdown breakdown) {
    if (!await Postgres.warmConnection(5)) { return null; }
    var hierarchy = await getTreeForTenant<TenantContactStatsNode>(tenantId);

    //perform calculations to determine stats for each element
    var calculationTasks = new ConcurrentBag<Task>();
    var allData = await hierarchy.getAllData();
    var timestampGotAllData = DateTime.Now;

    foreach (var d in allData) {
        calculationTasks.Add(d.getContactStats(start, end, breakdown));
    }

    Console.WriteLine("about to await all the tasks");
    //await the tasks to complete for calculations
    await AsyncHelper.waitAll(calculationTasks);
}


//method it's calling
public async Task getContactStats(DateTime start, DateTime end, Breakdown breakdown) {
    //perform two async postgres calls
    //await postgres calls
    //validate PG response
    //perform manipluation on this object with data from the queries
}

Я ожидаю, что первый вызов вызовет вторую функцию, добавит задачу в пакет и будет ждать ихпосле того, как это сделано.Что происходит на самом деле, так это то, что метод работает, завершается, затем добавляется в пакет.

* РЕДАКТИРОВАТЬ *

Ниже приведен полный код этого второго вызова в соответствии с запросом,Он принимает некоторые данные из базы данных, основанные на времени, заполняет промежутки между отснятыми временами, поэтому у нас есть полностью последовательный список возвратов, включающий все времена, когда данных в базе данных нет, и помещает их в переменную уровня объекта

public async Task getContactStats(DateTime start, DateTime end, Breakdown breakdown) {
    if (breakdown == Breakdown.Month) {
        //max out month start day to include all for the initial month in the initial count
        start = new DateTime(start.Year, start.Month, DateTime.DaysInMonth(start.Year, start.Month));
    } else {
        //day breakdown previous stats should start the day before given start day
        start = start.AddDays(-1);
    }

    var tran = new PgTran();
    var breakdownQuery = breakdown == Breakdown.Day ? Queries.GET_CONTACT_DAY_BREAKDOWN : Queries.GET_CONTACT_MONTH_BREAKDOWN;
    tran.setQueries(Queries.GET_CONTACT_COUNT_BEFORE_DATE, breakdownQuery);
    tran.setParams(new NpgsqlParameter("@tid", tenantId), new NpgsqlParameter("@start", start), new NpgsqlParameter("@end", end));
    var tranResults = await Postgres.getAll<ContactDayStatistic>(tran);
    //ensure transaction returns two query results
    if (tranResults == null || tranResults.Count != 2) { return; }


    //ensure valid past count was retrieved
    var prevCountResult = tranResults[0];
    if (prevCountResult == null || prevCountResult.Count != 1) { return; }
    var prevStat = new ContactDayStatistic(start.Day, start.Month, start.Year, prevCountResult[0].count);
    //ensure valid contact stat breakdown was retrieved
    var statBreakdown = tranResults[1];
    if (statBreakdown == null) { return;}

    var datesInBreakdown = new List<DateTime?>();
    //get all dates in the returned stats
    foreach (var e in statBreakdown) {
        var eventDate = new DateTime(e.year, e.month, e.day);
        if (datesInBreakdown.Find(item => item == eventDate) == null)
            datesInBreakdown.Add(eventDate);
    }
    //sort so they are sequential
    datesInBreakdown.Sort();

    //initialize timeline starting with initial breakdown
    var fullTimeline = new List<ContactStatistic>();
    //convert initial stat to the right type for final display
    fullTimeline.Add(breakdown == Breakdown.Month ? new ContactStatistic(prevStat) : prevStat);
    foreach (var d in datesInBreakdown) {
        //null date is useless, won't occur, nullable date just for default value of null
        if (d == null) { continue; }
        var newDate = d.Value;
        //fill gaps between last date given and this date
        ContactStatistic.fillGaps(breakdown, newDate, prevStat.getDate(), prevStat.count, ref fullTimeline, false);
        //get stat for this day
        var stat = statBreakdown.Find(item => d == new DateTime(item.year, item.month, item.day));
        if (stat == null) { continue; }
        //add last total for a rolling total of count
        stat.count += prevStat.count;
        fullTimeline.Add(breakdown == Breakdown.Month ? new ContactStatistic(stat) : stat);
        prevStat = stat;
    }
    //fill gaps between last date and end
    ContactStatistic.fillGaps(breakdown, end, prevStat.getDate(), prevStat.count, ref fullTimeline, true);
    //cast list to appropriate return type
    contactStats.Clear();
    contactStats = fullTimeline;
}

* РЕДАКТИРОВАТЬ 2 * Вот код, который AsyncHelper использует для ожидания этих задач.Эта функция отлично работает для другого кода, использующего ту же платформу, и она в основном просто для очистки кода, который должен ждать перечисленных задач.

public static async Task waitAll(IEnumerable<Task> coll) {
    foreach (var taskToWait in coll) {
        await taskToWait;
    }  
}

* РЕДАКТИРОВАТЬ 3 * Согласно рекомендации,Я изменил waitAll (), чтобы использовать Task.WhenAll () вместо цикла foreach, однако проблема все еще возникает.

public static async Task waitAll(IEnumerable<Task> coll) {
    await Task.WhenAll(coll);
}

* EDIT 4 * Чтобы убедиться, что это не PostgresЧтобы это произошло, я изменил второй метод, чтобы он выполнял только строку печати, а затем спал в течение 200 миллисекунд, чтобы сохранить путь выполнения чистым.Я все еще замечаю, что это происходит полностью последовательно (даже из-за истечения времени ожидания моего POST для этой функции, потому что реальный вызов занимает почти 20 мс).Ниже приведен код этого изменения, демонстрирующий

public async Task getContactStats(DateTime start, DateTime end, Breakdown breakdown) {
    Console.WriteLine("CALLED!");
    Thread.Sleep(200);
}

* РЕДАКТИРОВАТЬ 5 * В соответствии с рекомендацией я попытался использовать параллельный foreach, чтобы попытаться заполнить ConcurrentBag задач, а не обычный foreach.Здесь я сталкиваюсь с проблемой, когда параллельный foreach заканчивается после первого добавления и не добавляет все задачи сразу.

var calculationTasks = new ConcurrentBag<Task>();
var allData = await hierarchy.getAllData();
var timestampGotAllData = DateTime.Now;
Parallel.ForEach(allData, item => {
    Console.WriteLine("trying parallel foreach");
    calculationTasks.Add(item.getContactStats(start, end, breakdown));
});

Console.WriteLine("about to await all the tasks");
//await the tasks to complete for calculations
await AsyncHelper.waitAll(calculationTasks);

* РЕДАКТИРОВАТЬ 6 * Для визуальногоЯ запустил код и сделал несколько выводов, чтобы показать, что происходит странно.Выполнение кода выглядит следующим образом:

foreach (var d in allData) {
    Console.WriteLine("Adding call to bag");
    calculationTasks.Add(d.getContactStats(start, end, breakdown));
    Console.WriteLine("Done adding call to bag");
}

Выходные данные: https://i.imgur.com/3y5S4eS.png

Так как каждый раз печатается «CALLED», затем «Done!»перед «Завершено добавление вызова в bag» эти выполнения выполняются последовательно, а не асинхронно, как ожидалось.

Ответы [ 2 ]

0 голосов
/ 05 апреля 2019

Попробуйте:

foreach (var d in allData) 
{
    calculationTasks.Add(Task.Run(() => d.getContactStats(start, end, breakdown)));
}

//Other code here
//...

Task.WaitAll(calculationTasks.ToArray());

По сути, мы создаем задачу, которая будет «запускать» ваш метод.Затем мы ожидаем завершения этих задач.

Правда, я не совсем уверен, почему ваша версия блокируется, но, похоже, это помогает.

ОБНОВЛЕНИЕ:

Я тестировал, выводя идентификатор потока, и версия OP выполняет задачи в том же потоке.Возможно, поток блокируется сумкой, что заставляет новые задачи ждать?Мое предлагаемое решение приводит к различным идентификаторам потоков, которые, я думаю, объясняют, почему оно не блокируется.

0 голосов
/ 05 апреля 2019

Мой инстинкт инстинкта заключается в том, что это будет связано с транзакцией, которую вы открываете в своем методе. Немного сложно точно сказать, что происходит в вашем коде, так как здесь, кажется, есть несколько пользовательских классов - но потенциально ли происходит некоторая блокировка при открытии транзакции? Поскольку это происходит до вашего первого ожидания, он должен запускаться «последовательно» до ожидаемого кода.

Ваш пользовательский метод waitall, по-видимому, не является проблемой, но вы должны рассмотреть возможность его удаления и использования встроенного Task.WhenAll для их асинхронного ожидания.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...