Структура данных, используемая при сборе ответа от асинхронных вызовов - PullRequest
0 голосов
/ 17 ноября 2018

Я запускаю этот кусок кода в моем приложении.

public Task<BulkResponse<JObject>> GetRelatedObjectsAsync(IEnumerable<PrimaryObjectInfo> primaryObjectInfos)
{
    var allSecondaries = new List<Tuple<int, List<JObject>>>();
    var exceptionsDict = new ConcurrentDictionary<int, Exception>();

    var relatedObjectsTasks = primaryObjectInfos.Select(async primaryObjectInfo =>
    {
        try
        {
            var secondaryObject = await objectManager.GetRelatedObjectsAsync(primaryObjectInfo);
            allSecondaries.Add(Tuple.Create(primaryObjectInfo.Index, secondaryObject.ToList()));
        }
        catch (Exception ex)
        {
            exceptionsDict.TryAdd(primaryObjectInfo.Index,  ex);
        }
    });

    await Task.WhenAll(relatedObjectsTasks);

    return ConvertToBulkResponse(allSecondaries, exceptionsDict);
}

Когда я запускаю этот код, allSecondaries объект иногда возвращает действительный список результатов, а иногда код заканчивает тем, что ловит исключения для параллельных потоков, которые у меня есть для каждого primaryObjectInfo.

Асинхронный метод objectManager.GetRelatedObjectsAsync() внутренне вызывает 4-5 асинхронных функций, и есть функции, в которых параметры передаются по ссылке. (ключевое слово ref)

Вопрос: Использую ли я правильную структуру данных для консолидации результатов всех параллельных потоков? Если да, с какой стати я каждый раз получаю разные результаты?

1 Ответ

0 голосов
/ 18 ноября 2018

Лучше разделить выполнение от сбора результатов:

public Task<BulkResponse<JObject>> GetRelatedObjectsAsync(
    IEnumerable<PrimaryObjectInfo> primaryObjectInfos)
{
    var relatedObjectsTasks = primaryObjectInfos
        .Select(
            async primaryObjectInfo =>
                (primaryObjectInfo.Index,
                 await objectManager.GetRelatedObjectsAsync(primaryObjectInfo)))
        .ToList();

    try
    {
        await Task.WhenAll(relatedObjectsTasks);
    }
    catch
    {
        // observe each task's, exception
    }

    var allSecondaries = new List<(int index, List<JObject> related)>();
    var exceptionsDict = new Dictionary<int, Exception>();

    foreach (var relatedObjectsTask in relatedObjectsTasks)
    {
        try
        {
            allSecondaries.Add(relatedObjectsTask.Result);
        }
        catch (Exception ex)
        {
            exceptionsDict.Add(relatedObjectsTask.index,  ex);
        }
    }

    return ConvertToBulkResponse(allSecondaries, exceptionsDict);
}

Вы можете просмотреть свойства IsFaulted / Exception и IsCancelled каждой задачи вместо создания исключений:

    foreach (var relatedObjectsTask in relatedObjectsTasks)
    {
        if (relatedObjectsTask.IsCancelled)
        {
            exceptionsDict.Add(
                relatedObjectsTask.index,
                new TaskCancelledException(relatedObjectsTask));
        }
        else if (relatedObjectsTask.IsFaulted)
        {
            exceptionsDict.TryAdd(relatedObjectsTask.index,  ex);
        }
        else
        {
            allSecondaries.Add(relatedObjectsTask.Result);
        }
    }
...