Elastic Search блокирует плагин вложения - PullRequest
0 голосов
/ 20 февраля 2019

Я использую NEST (C #) и подключаемый модуль ingest attachment для загрузки десятков тысяч документов в экземпляр Elastic Search.К сожалению, через некоторое время все просто стоит на месте - то есть больше не принимаются документы.Журнал показывает:

[2019-02-20T17:35:07,528][INFO ][o.e.m.j.JvmGcMonitorService] [BwAAiDl] [gc][7412] overhead, spent [326ms] collecting in the last [1s]

Не уверен, что это кому-нибудь говорит?Кстати, есть ли более эффективные способы получения многих документов (вместо использования тысяч запросов REST)?

Я использую такой код:

client.Index(new Document
{
    Id = Guid.NewGuid(),
    Path = somePath,
    Content = Convert.ToBase64String(File.ReadAllBytes(somePath))
}, i => i.Pipeline("attachments"));

Определите конвейер:

client.PutPipeline("attachments", p => p
    .Description("Document attachment pipeline")
    .Processors(pr => pr
        .Attachment<Document>(a => a
        .Field(f => f.Content)
        .TargetField(f => f.Attachment)
        )
        .Remove<Document>(r => r
        .Field(f => f.Content)
        )
    )
);

1 Ответ

0 голосов
/ 21 февраля 2019

Журнал указывает, что на сборку мусора на стороне сервера Elasticsearch затрачивается значительное время;это может быть причиной больших событий остановки, которые вы видите.Если у вас включен мониторинг в кластере (в идеале экспорт таких данных в отдельный кластер), я бы посмотрел на анализ этих данных, чтобы увидеть, проливает ли он свет на то, почему происходит большой GC.

более эффективные способы получения большого количества документов (вместо использования тысяч запросов REST)?

Да, вы индексируете каждое вложение в отдельном запросе индекса.В зависимости от размера каждого вложения, закодированного в base64, вы можете захотеть отправить несколько в одном массовом запросе

// Your collection of documents
var documents = new[]
{
    new Document
    {
        Id = Guid.NewGuid(),
        Path = "path",
        Content = "content"
    },
    new Document
    {
        Id = Guid.NewGuid(),
        Path = "path",
        Content = "content" // base64 encoded bytes
    }
};

var client = new ElasticClient();

var bulkResponse = client.Bulk(b => b
    .Pipeline("attachments")
    .IndexMany(documents)
);

Если вы читаете документы из файловой системы, вы, вероятно, захотите их лениво перечислить и отправить большую частьЗапросы.Здесь вы также можете использовать вспомогательный метод BulkAll.

Сначала наберите лениво перечисляемый набор документов

public static IEnumerable<Document> GetDocuments()
{
    var count = 0;
    while (count++ < 20)
    {
        yield return new Document
        {
            Id = Guid.NewGuid(),
            Path = "path",
            Content = "content" // base64 encoded bytes
        };
    }
}

Затем настройте вызов BulkAll

var client = new ElasticClient();

// set up the observable configuration
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
    .Pipeline("attachments")
    .Size(10)
);

var waitHandle = new ManualResetEvent(false);

Exception exception = null;

// set up what to do in response to next bulk call, exception and completion
var bulkAllObserver = new BulkAllObserver(
    onNext: response => 
    {
        // perform some action e.g. incrementing counter
        // to indicate how many have been indexed
    },
    onError: e =>
    {
        exception = e;
        waitHandle.Set();
    },
    onCompleted: () =>
    {
        waitHandle.Set();
    });

// start the observable process
bulkAllObservable.Subscribe(bulkAllObserver);

// wait for indexing to finish, either forever,
// or set a max timeout as here.
waitHandle.WaitOne(TimeSpan.FromHours(1));

if (exception != null)
    throw exception;

Размер определяет количество документов для отправки в каждом запросе.Не существует жестких и быстрых правил относительно того, насколько большим он может быть для вашего кластера, поскольку он может зависеть от ряда факторов, включая конвейер загрузки, отображение документов, размер документов в байтах, аппаратное обеспечение кластера и т. Д. Вы можете настроитьможно повторить попытку документов, которые не были проиндексированы, и если вы видите es_rejected_execution_exception, вы находитесь на пределе возможностей, которые ваш кластер может обрабатывать одновременно.

Еще одна рекомендация - это идентификаторы документов,Я вижу, что вы используете новые направляющие для идентификаторов документов, что подразумевает, что вам все равно, какое значение имеет каждый документ.Если это так, я бы рекомендовал не отправлять значение Id, а вместо этого разрешить Elasticsearch генерировать идентификатор для каждого документа.Это очень вероятно приведет к улучшению производительности (я полагаю, что реализация немного изменилась в Elasticsearch и Lucene с момента публикации, но точка зрения остается неизменной) .

...