Многопоточность в цикле foreach - PullRequest
1 голос
/ 30 апреля 2020

Я прочитал несколько потоков stackoverflow о многопоточности в foreach l oop, но я не уверен, что понимаю и правильно его использую.
Я пробовал несколько сценариев ios, но я не вижу большого увеличения производительности.

Вот что я считаю выполняет асинхронные задачи , но работает синхронно в l oop, используя единственный поток :

Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();

foreach (IExchangeAPI selectedApi in selectedApis)
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        ticker = await selectedApi.GetTickerAsync(symbol);
    }               
}    
stopWatch.Stop();

Вот что я надеялся, что будет работать асинхронно (все еще используя однопоточный ) - я бы ожидал некоторого улучшения скорости уже здесь:

List<Task<ExchangeTicker>> exchTkrs = new List<Task<ExchangeTicker>>();
stopWatch.Start();

foreach (IExchangeAPI selectedApi in selectedApis)
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        exchTkrs.Add(selectedApi.GetTickerAsync(symbol));
    }
}

ExchangeTicker[] retTickers = await Task.WhenAll(exchTkrs);
stopWatch.Stop();

Вот то, что я хотел бы запустить асинхронно в многопоточности :

stopWatch.Start();

Parallel.ForEach(selectedApis, async (IExchangeAPI selectedApi) =>
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        ticker = await selectedApi.GetTickerAsync(symbol);
    }
});
stopWatch.Stop();

Результаты секундомера интерпретируются следующим образом:

Console.WriteLine("Time elapsed (ns): {0}", stopWatch.Elapsed.TotalMilliseconds * 1000000);

Выводы консоли:

Time elapsed (ns): 4183308100
Time elapsed (ns): 4183946299.9999995
Time elapsed (ns): 4188032599.9999995

Теперь улучшение скорости выглядит незначительным. Я делаю что-то не так или это более или менее то, что я должен ожидать? Я полагаю, что лучше проверить это при записи в файлы.
Не могли бы вы также подтвердить, что я правильно интерпретирую различные варианты использования?

Наконец, использование foreach l oop для одновременного получения тикера с нескольких платформ может быть не лучшим подходом. Приветствуются предложения по улучшению этого.

EDIT

Обратите внимание, что я использую базу кода ExchangeSharp, которую вы можете найти здесь

Здесь Вот как выглядит метод GerTickerAsyn c ():

public virtual async Task<ExchangeTicker> GetTickerAsync(string marketSymbol)
{
    marketSymbol = NormalizeMarketSymbol(marketSymbol);
    return await Cache.CacheMethod(MethodCachePolicy, async () => await OnGetTickerAsync(marketSymbol), nameof(GetTickerAsync), nameof(marketSymbol), marketSymbol);
}

Для Kraken API у вас есть:

protected override async Task<ExchangeTicker> OnGetTickerAsync(string marketSymbol)
{
    JToken apiTickers = await MakeJsonRequestAsync<JToken>("/0/public/Ticker", null, new Dictionary<string, object> { { "pair", NormalizeMarketSymbol(marketSymbol) } });
    JToken ticker = apiTickers[marketSymbol];
    return await ConvertToExchangeTickerAsync(marketSymbol, ticker);
}

И метод кэширования:

public static async Task<T> CacheMethod<T>(this ICache cache, Dictionary<string, TimeSpan> methodCachePolicy, Func<Task<T>> method, params object?[] arguments) where T : class
{
    await new SynchronizationContextRemover();
    methodCachePolicy.ThrowIfNull(nameof(methodCachePolicy));
    if (arguments.Length % 2 == 0)
    {
        throw new ArgumentException("Must pass function name and then name and value of each argument");
    }
    string methodName = (arguments[0] ?? string.Empty).ToStringInvariant();
    string cacheKey = methodName;
    for (int i = 1; i < arguments.Length;)
    {
        cacheKey += "|" + (arguments[i++] ?? string.Empty).ToStringInvariant() + "=" + (arguments[i++] ?? string.Empty).ToStringInvariant("(null)");
    }
    if (methodCachePolicy.TryGetValue(methodName, out TimeSpan cacheTime))
    {
        return (await cache.Get<T>(cacheKey, async () =>
        {
            T innerResult = await method();
            return new CachedItem<T>(innerResult, CryptoUtility.UtcNow.Add(cacheTime));
        })).Value;
    }
    else
    {
        return await method();
    }
}

Ответы [ 2 ]

1 голос
/ 30 апреля 2020

Сначала следует указать, что вы пытаетесь достичь производительности , а не асинхронности. И вы пытаетесь достичь этого, выполняя несколько операций одновременно , а не параллельно . Для простоты объяснения я буду использовать упрощенную версию вашего кода и буду считать, что каждая операция - это прямой веб-запрос, без промежуточного уровня кэширования и без зависимостей от значений, существующих в словарях.

foreach (var symbol in selectedSymbols)
{
    var ticker = await selectedApi.GetTickerAsync(symbol);
}

Приведенный выше код выполняет операции последовательно. Каждая операция начинается после завершения предыдущей.

var tasks = new List<Task<ExchangeTicker>>();
foreach (var symbol in selectedSymbols)
{
    tasks.Add(selectedApi.GetTickerAsync(symbol));
}
var tickers = await Task.WhenAll(tasks);

Приведенный выше код выполняет операции одновременно. Все операции начинаются сразу. Ожидается, что общая продолжительность будет равна продолжительности самой продолжительной операции.

Parallel.ForEach(selectedSymbols, async symbol =>
{
    var ticker = await selectedApi.GetTickerAsync(symbol);
});

Приведенный выше код выполняет операции одновременно, как и предыдущая версия с Task.WhenAll. Это не дает никаких преимуществ, но при этом имеет огромный недостаток: у вас больше нет способа await выполнить операции. Метод Parallel.ForEach вернется сразу после запуска операций, потому что Parallel класс не понимает asyn c делегатов (он не принимает Func<Task> лямбда-выражения). По сути, там есть куча async void лямбд, которые выходят из-под контроля, и в случае исключения они остановят процесс.

Так что правильный путь Запускать операции одновременно можно вторым способом, используя список задач и Task.WhenAll. Поскольку вы уже измерили этот метод и не заметили каких-либо улучшений производительности, я предполагаю, что есть что-то еще, что сериализует параллельные операции. Это может быть что-то вроде SemaphoreSlim, скрытое где-то в вашем коде, или какой-то механизм на стороне сервера, который душит ваши запросы. Вам нужно будет продолжить расследование, чтобы выяснить, где и почему происходит удушение.

1 голос
/ 30 апреля 2020

В общем случае, когда вы не видите увеличения при многопоточности, это происходит потому, что ваша задача не ограничена или не достаточно велика для загрузки процессора.

В вашем примере, например:

selectedApi.GetTickerAsyn c (символ);

Это может иметь две причины:

1: поиск тикера очень быстрый и не должен быть asyn c для начала. Т.е., когда вы ищите его в словаре.

2: Это выполняется через соединение http, где среда выполнения ОГРАНИЧИВАЕТ КОЛИЧЕСТВО СОПУТСТВУЮЩИХ ВЫЗОВОВ. Независимо от того, сколько задач вы открываете, он не будет использовать более 4 одновременно.

О, и 3: вы думаете, что asyn c использует потоки. Не то. В кодекле это особенно не так:

ждут selectedApi.GetTickerAsyn c (символ);

Где вы в основном НЕМЕДЛЕННО ЖДЕТЕ РЕЗУЛЬТАТА. Здесь вообще нет многопоточности.

foreach (IExchangeAPI selectedApi в selectedApis) {if (exchangeSymbols.TryGetValue (selectedApi.Name, out out symbol)) {ticker = await selectedApi.GetTickerAsyn c(символ); }}

Это линейный не поточный код, использующий интерфейс asyn c, чтобы не блокировать текущий поток, пока выполняется операция (вероятно, дорогостоящая IO). Он начинается один, потом ждет результата. Никакие 2 запроса никогда не запускаются одновременно.

Если вы хотите (более в качестве примера) более масштабируемый способ:

  • В foreach не ждите, но добавьте задачу к списку задач.
  • Затем начните ждать, как только все задачи начались. Как во 2-м * oop.

ПУТЬ не идеальна, но, по крайней мере, среда выполнения имеет ШАНС для выполнения нескольких поисков одновременно. Ваше ожидание гарантирует, что вы, по сути, выполняете однопоточный код, за исключением asyn c, поэтому ваш поток возвращается в пул (и не ожидает результатов), увеличивая масштабируемость - элемент, возможно, не имеет значения в этом случае и определенно не измерено в вашем тесте.

...