BulkIndexer в пакете Olivere для Golang, чтобы заменить Elastigo - PullRequest
0 голосов
/ 22 ноября 2018

Я заметил, что могу использовать BulkIndexer, если я хочу отправить данные в эластичный поиск оптом.Как указано в документации Elastigo

Массовый индексатор создает подпрограммы и каналы для подключения и отправки данных вasticsearch оптом, используя буферы.

Код в elastigo для вставкиоптом

var c_es = elastigo.NewConn()
var indexer = c_es.NewBulkIndexer(50)

func insertInBulkElastic(){
    //Create a custom error function when inserting data into elasticsearch 
   //in bulk
    indexer.Sender = func(buf *bytes.Buffer) error {
    // @buf is the buffer of docs about to be written
    respJson, err := c_es.DoCommand("POST", "/_bulk", nil, buf)
    if err != nil {
        // handle it better than this

        fmt.Println("Error", string(respJson)) // 

        fmt.Println("Error", err)
    }

    if err == nil {
        fmt.Println("The data was inserted successfullly to elastic search")
    }
    return err
  }



}

Кто-нибудь знает, как отправить массовый запрос с использованием olivere для golang?

Спасибо

1 Ответ

0 голосов
/ 22 ноября 2018

Вот рабочий пример использования olivere в Go.Вы можете прочитать больше о BulkProcessor здесь

Надеюсь, эта помощь:)

package main

import (
    "context"
    "log"
    "time"

    elastic "gopkg.in/olivere/elastic.v5"
)

func main() {
    options := []elastic.ClientOptionFunc{
        elastic.SetHealthcheck(true),
        elastic.SetHealthcheckTimeout(20 * time.Second),
        elastic.SetSniff(false),
        elastic.SetHealthcheckInterval(30 * time.Second),
        elastic.SetURL("http://127.0.0.1:9200"),
        elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewConstantBackoff(5 * time.Second))),
    }
    client, err := elastic.NewClient(options...)
    if err != nil {
        panic(err)
    }
    // ensure index exist
    exists, err := client.IndexExists("my_index").Do(context.Background())
    if err != nil {
        panic(err)
    }
    if !exists {
        if _, err := client.CreateIndex("my_index").Do(context.Background()); err != nil {
            panic(err)
        }
    }
    client.PutMapping().Index("my_index").BodyJson(map[string]interface{}{
        "properties": map[string]string{
            "name": "keyword",
        },
    }).Do(context.Background())

    // create new bulk processor from client
    bulkProcessor, err := elastic.NewBulkProcessorService(client).
        Workers(5).
        BulkActions(1000).
        FlushInterval(1 * time.Second).
        After(after).
        Do(context.Background())

    // now the bulk processor can be reused for entire the app
    myDoc := struct {
        Name string
    }{
        Name: "jack",
    }
    req := elastic.NewBulkIndexRequest()
    req.Index("my_index").Type("type").Id("my_doc_id").Doc(myDoc)

    // Use Add method to add request into the processor
    bulkProcessor.Add(req)

    // wait for sometime...
    time.Sleep(5 * time.Second)
}

func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
    if err != nil {
        log.Printf("bulk commit failed, err: %v\n", err)
    }
    // do what ever you want in case bulk commit success
    log.Printf("commit successfully, len(requests)=%d\n", len(requests))
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...