Java ThreadPoolExecutor для индексации документов в ElasticSearch - PullRequest
0 голосов
/ 10 октября 2018

Я новичок в Java ThreaPoolExecutor и написал несколько задач для индексации документов в упругом поиске.Через ThreaPoolExecutor я выполняю эту задачу и она работает нормально.

Но я все еще не совсем уверен в моем подходе.

Пожалуйста, найдите мой код ниже

public class IndexApp {

    public static void main(String[] args)
    {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        Map<String, Object> jsonMap ;

        System.out.println("Indexing via Java Code ....");
        Product prod1=new Product("1001", 123172l, "Product", "VG3000");
        Product prod2=new Product("1002", 123172l, "Series", "Valves, VG3000");
        Product prod3=new Product("1003", 3536633, "Series", "Activa RoofTop, VG3000 karthikeyan ");
        Product prod4=new Product("1004", 123172l, "Product", "Activa RoofTop VG3000, 3000");

        List<Product> objList=new ArrayList<Product>();
        objList.add(prod1);
        objList.add(prod2);
        objList.add(prod3);
        objList.add(prod4);

        for(int i=0;i<objList.size();i++)
        {
            jsonMap = new HashMap<String, Object>();
            jsonMap.put("id", objList.get(i).getId());
            jsonMap.put("catalog_id", objList.get(i).getCatalog_id());
            jsonMap.put("catalog_type", objList.get(i).getCatalog_type());
            jsonMap.put("values", objList.get(i).getValues());
            IndexTask task = new IndexTask(jsonMap);
            executor.execute(task);
        }
         executor.shutdown();
    }

}


public class IndexTask implements Runnable {

private final static String INDEX_NAME = "index_prod";

Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;

public IndexTask(Map<String, Object> jsonMap ) {
    this.jsonMap = jsonMap;
}

public Map<String, Object> getJsonMap() {
    return jsonMap;
}

public void run() {
    try {
        Long duration = (long) (Math.random() * 10);
        System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );

        request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
                .source(jsonMap);

        try {
            IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout 
        } catch(ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
            }
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        TimeUnit.SECONDS.sleep(duration);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

Кто-нибудь, дайте мне знать, что мой подход имеет полное значение для индексации документов в упругом поиске .?

Обновление 2

Пожалуйста, найдите мой измененный код.

Вместо использования IndexRequest я использовал BulkRequest

public class ProdCatIndexTask implements Runnable {

    private final static String INDEX_NAME = "productcatalog_index";

    Product prod=new Product();
    IndexRequest request;
    Map<String, Object> jsonMap ;

    BulkRequest bulkRequest = new BulkRequest();

    public ProdCatIndexTask(Map<String, Object> jsonMap ) {
        this.jsonMap = jsonMap;
    }

    public Map<String, Object> getJsonMap() {
        return jsonMap;
    }

    public void run() {
        try {
            Long duration = (long) (Math.random() * 10);
            System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );

            /*request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
                    .source(jsonMap);*/

            bulkRequest.add( new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString()).source(jsonMap));

            try {
                //IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
                BulkResponse bulkResponse = SearchEngineClient.getInstance3().bulk(bulkRequest);
                System.out.println("Triggered Bulk Request.....");
            } catch(ElasticsearchException e) {
                if (e.status() == RestStatus.CONFLICT) {
                }
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }

            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

1 Ответ

0 голосов
/ 11 октября 2018

Если вы хотите загружать данные в массовом режиме параллельно, я рекомендую использовать ElasticSearch API BulkProcessor.

Вот https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.

Пример использования массового процессора:

bulkProcessor.add(new IndexRequest("indexName", "type")
.source(toJson(Product), XContentType.JSON);

Если вы хотите сделать быстрее, вы можете уменьшить количество реплик до 0 и позволить ElasticSearch генерировать идентификаторы, потому что, если вы индексируете свой собственный идентификатор, каждый раз ElasticSearch проверяет, существует ли этот идентификатор в ElasticSearch.

Другие идеи о том, как повысить производительность загрузки:

https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html

...