Я новичок в Java ThreaPoolExecutor
и написал несколько задач для индексации документов в упругом поиске.Через ThreaPoolExecutor
я выполняю эту задачу и она работает нормально.
Но я все еще не совсем уверен в моем подходе.
Пожалуйста, найдите мой код ниже
public class IndexApp {
public static void main(String[] args)
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
Map<String, Object> jsonMap ;
System.out.println("Indexing via Java Code ....");
Product prod1=new Product("1001", 123172l, "Product", "VG3000");
Product prod2=new Product("1002", 123172l, "Series", "Valves, VG3000");
Product prod3=new Product("1003", 3536633, "Series", "Activa RoofTop, VG3000 karthikeyan ");
Product prod4=new Product("1004", 123172l, "Product", "Activa RoofTop VG3000, 3000");
List<Product> objList=new ArrayList<Product>();
objList.add(prod1);
objList.add(prod2);
objList.add(prod3);
objList.add(prod4);
for(int i=0;i<objList.size();i++)
{
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", objList.get(i).getId());
jsonMap.put("catalog_id", objList.get(i).getCatalog_id());
jsonMap.put("catalog_type", objList.get(i).getCatalog_type());
jsonMap.put("values", objList.get(i).getValues());
IndexTask task = new IndexTask(jsonMap);
executor.execute(task);
}
executor.shutdown();
}
}
public class IndexTask implements Runnable {
private final static String INDEX_NAME = "index_prod";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
public IndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);
try {
IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Кто-нибудь, дайте мне знать, что мой подход имеет полное значение для индексации документов в упругом поиске .?
Обновление 2
Пожалуйста, найдите мой измененный код.
Вместо использования IndexRequest
я использовал BulkRequest
public class ProdCatIndexTask implements Runnable {
private final static String INDEX_NAME = "productcatalog_index";
Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;
BulkRequest bulkRequest = new BulkRequest();
public ProdCatIndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}
public Map<String, Object> getJsonMap() {
return jsonMap;
}
public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );
/*request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);*/
bulkRequest.add( new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString()).source(jsonMap));
try {
//IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
BulkResponse bulkResponse = SearchEngineClient.getInstance3().bulk(bulkRequest);
System.out.println("Triggered Bulk Request.....");
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}