Похоже, вы неправильно поняли API клиента; вам не нужно указывать _Shards
, Hit
, Hits
, _Source
и т. д., клиент позаботится о десериализации этих частей API Elasticsearch.
Единственная часть, которую вам нужно определить - это POCO, который будет сопоставляться с объектом JSON в каждом поле "_source"
в ответе, т.е.
{
"duration" : 14986283,
"group_id" : "com",
"var_time" : "2018-04-24T17:05:13.082+02:00",
"var_name" : "2",
}
, что похоже на _Source
POCO (хотя я был бы склонен дать ему более осмысленное имя!). Давайте сейчас просто назовем это MyDocument
.
С MyDocument
определяется как
public class MyDocument
{
[PropertyName("duration")]
public int Duration { get; set; }
[PropertyName("group_id")]
public string GroupId { get; set; }
[PropertyName("var_time")]
public DateTime Time { get; set; }
[PropertyName("var_name")]
public string Name { get; set; }
}
Простой поиск будет
var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(pool)
.DefaultMappingFor<MyDocument>(m => m
.IndexName("test_index")
.TypeName("doc")
);
var client = new ElasticClient(settings);
var searchResponse = client.Search<MyDocument>();
// A collection of the top 10 matching documents
var documents = searchResponse.Documents;
DefaultMappingFor<MyDocument>(...)
будет использовать имя индекса "test_index"
и имя типа "doc
" всякий раз, когда универсальный тип документа равен MyDocument
, и они явно не определены в запросе.
Вышеуказанный поиск создает следующий запрос к Elasticsearch
POST http://localhost:9200/test_index/doc/_search
{}
Похоже, вы хотите использовать Scroll API для возврата всех соответствующих документов. Чтобы сделать это с помощью API прокрутки, вы должны написать цикл, чтобы продолжать делать запросы прокрутки, пока документы возвращаются
var searchResponse = client.Search<MyDocument>(s => s
.Size(1000)
.Scroll("30s")
);
while (searchResponse.Documents.Any())
{
foreach (var document in searchResponse.Documents)
{
// do something with this set of 1000 documents
}
// make an additional request
searchResponse = client.Scroll<MyDocument>("30s", searchResponse.ScrollId);
}
// clear scroll id at the end
var clearScrollResponse = client.ClearScroll(c => c.ScrollId(searchResponse.ScrollId));
Существует ScrollAll
наблюдаемый помощник, который вы можете использовать для упрощения написания, и распараллеливает операцию, используя sliced_scroll
. Та же операция, что и выше, но с использованием ScrollAll
// set to number of shards in targeted indices
var numberOfSlices = 4;
var scrollAllObservable = client.ScrollAll<MyDocument>("30s", numberOfSlices);
Exception exception = null;
var manualResetEvent = new ManualResetEvent(false);
var scrollAllObserver = new ScrollAllObserver<MyDocument>(
onNext: s =>
{
var documents = s.SearchResponse.Documents;
foreach (var document in documents)
{
// do something with this set of documents
}
},
onError: e =>
{
exception = e;
manualResetEvent.Set();
},
onCompleted: () => manualResetEvent.Set()
);
scrollAllObservable.Subscribe(scrollAllObserver);
manualResetEvent.WaitOne();
if (exception != null)
throw exception;
Если вам не нужен полный контроль над наблюдателем, вы можете использовать упрощенную версию. При этом вам необходимо указать максимальное время выполнения всей операции, хотя
var numberOfSlices = 4;
var scrollAllObservable = client.ScrollAll<MyDocument>("30s", numberOfSlices)
.Wait(TimeSpan.FromHours(2), onNext: s =>
{
var documents = s.SearchResponse.Documents;
foreach (var document in documents)
{
// do something with this set of documents
}
});