Параллельный вызов Elasticsearch через c# - PullRequest
0 голосов
/ 06 августа 2020

Я пытаюсь выполнить параллельный вызов индекса Elasticsearch для нескольких запросов и агрегировать результат, используя Parallel.ForEach.

Мой код:

private static List<SearchResponse<dynamic>> SearchQueryInParallel(string indexName, string lang, string[] eQueries)
{

    var result = new List<SearchResponse<dynamic>>();
    var exceptions = new ConcurrentQueue<Exception>();

    object mutex = new object();

    try
    {
        Parallel.ForEach(eQueries,
            () => new SearchResponse<dynamic>()
            , (q, loopState, subList) =>
            {

                var x = LowlevelClient.Search<SearchResponse<dynamic>>(indexName, $"article_{lang}", q);
                subList = x;
                return subList;

            }, subList =>
            {
                lock (result)
                    result.Add(subList);
            }
        );
    }
    catch (AggregateException ae)
    {
        foreach (var e in ae.InnerExceptions)
        {
            exceptions.Enqueue(e);
        }
    }

    if (exceptions.ToList().Any())
    {
        //there are exceptions, do something with them
        //do something?
    }

    return result;
}

Проблема, с которой я столкнулся в том, что sublist в приведенном выше случае это long. Это дает мне следующую ошибку:

Невозможно преобразовать из SearchResponse в long.

То же самое работает, когда я использовал без многопоточности, код:

var items = new List<dynamic>();

var searchResponse = lowLevelClient.Search<SearchResponse<dynamic>>(elasticIndexName, $"article_{languageCode.ToLowerInvariant()}", query);

foreach (var document in searchResponse.Body.Documents)
{
    items.Add(document);
} 

Любая помощь, пожалуйста? Если у кого-то есть другой способ добиться параллельного вызова и агрегирования данных из возвращаемых значений, мы будем очень признательны.

...