Я пытаюсь выполнить параллельный вызов индекса Elasticsearch для нескольких запросов и агрегировать результат, используя Parallel.ForEach
.
Мой код:
private static List<SearchResponse<dynamic>> SearchQueryInParallel(string indexName, string lang, string[] eQueries)
{
var result = new List<SearchResponse<dynamic>>();
var exceptions = new ConcurrentQueue<Exception>();
object mutex = new object();
try
{
Parallel.ForEach(eQueries,
() => new SearchResponse<dynamic>()
, (q, loopState, subList) =>
{
var x = LowlevelClient.Search<SearchResponse<dynamic>>(indexName, $"article_{lang}", q);
subList = x;
return subList;
}, subList =>
{
lock (result)
result.Add(subList);
}
);
}
catch (AggregateException ae)
{
foreach (var e in ae.InnerExceptions)
{
exceptions.Enqueue(e);
}
}
if (exceptions.ToList().Any())
{
//there are exceptions, do something with them
//do something?
}
return result;
}
Проблема, с которой я столкнулся в том, что sublist
в приведенном выше случае это long
. Это дает мне следующую ошибку:
Невозможно преобразовать из SearchResponse в long.
То же самое работает, когда я использовал без многопоточности, код:
var items = new List<dynamic>();
var searchResponse = lowLevelClient.Search<SearchResponse<dynamic>>(elasticIndexName, $"article_{languageCode.ToLowerInvariant()}", query);
foreach (var document in searchResponse.Body.Documents)
{
items.Add(document);
}
Любая помощь, пожалуйста? Если у кого-то есть другой способ добиться параллельного вызова и агрегирования данных из возвращаемых значений, мы будем очень признательны.