Невозможно получить блокировку синхронизированного метода при доступе через несколько потоков - PullRequest
0 голосов
/ 07 мая 2018

У меня есть код ниже, где я тестирую синхронизацию потоков для упругого поиска, но каким-то образом я не могу добиться успеха в этом, кто-нибудь может дать мне знать, где я иду не так? Если я включаю спящий поток внутри метода startThreadProcess, то все работает нормально, потому что он спит в течение определенного периода времени. Что я не хочу, я хочу получить правильную блокировку для потока без использования сна сна. но то, что происходит в приведенном выше коде, я использовал Executor для объединения потоков. Там я запускаю цикл для счетчика 4, чтобы инициировать 4 потока из пула. поэтому, когда мой поток отправляется с использованием executor submit, который вызывает синхронизированный метод, и внутри этого синхронизированного метода я передаю вызов другому синхронизированному методу, откуда я получаю общее количество счетчиков в конкретном узле и продолжаю работу с этим счетом, увеличивая, чтобы вставить новый документ, где мой первый поток еще не завершен. 2-й поток входит и пытается получить общее количество счетчиков из метода, который вызывается из синхронизированного метода, поэтому я получаю неправильный счетчик для потока 2, так как мой первый поток вставит 10000 документов json внутри узла, так что Я ожидаю, что поток 2 должен получить число 10000, а затем он должен обработать вставку, но здесь мой поток 2 входит между ними и получает количество случайных чисел и начинает вставку, увеличивая число, которое не является ожидаемым сценарием

package com.acn.adt.main;

public class ESTest {

    private static final String dataPath = "C:\\Elastic Search\\Data.json";
    static ESTest esTest = new ESTest();
    private static TransportClient client = null;
    private Properties elasticPro = null;
    private InputStream input = null;

    ElasticSearchCrud esCRUD = null;

    private final Object lock = new Object();

    public static void main(String[] args) {
        String strArray[] = new String[] {"1"};
        esTest.startProcess(strArray);
    }

    public void startProcess(String strArray[]) {
        try {
        input = new FileInputStream(ElasticSearchConstants.ELASTIC_PROPERTIES);
        elasticPro = new Properties();
        //elasticPro.load(ElasticSearchClient.class.getResourceAsStream(ElasticSearchConstants.ELASTIC_PROPERTIES));
        elasticPro.load(input);
        System.out.println(elasticPro.getProperty("homeDir"));

        long startTime = System.currentTimeMillis();
        Settings setting = Settings.builder()
        //.put("client.transport.ping_timeout", "100s")
        .put("cluster.name", elasticPro.getProperty("cluster"))
        //.put("node.name", elasticPro.getProperty("node"))
        //.put("client.transport.sniff", Boolean.valueOf(elasticPro.getProperty("transport.sniff")))
        .put("client.transport.sniff", false)
        .put("cluster.routing.allocation.enable", "all")
        .put("cluster.routing.allocation.allow_rebalance", "always")
        //.put("client.transport.ignore_cluster_name", true)
        .build();

        client = new PreBuiltTransportClient(setting)
        .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 
                    Integer.valueOf("9300")));

        long endTime = System.currentTimeMillis();

        System.out.println("Time taken for connecting " + TimeUnit.MILLISECONDS.toSeconds((endTime - startTime)));


        ExecutorService executorService = Executors.newFixedThreadPool(10);

        for(int i = 1; i <=4; i++) {

            if(i==1) {
                strArray = new String [] {"1"};
            }else if(i == 2) {
                strArray = new String [] {"1"};
            }else if(i == 3) {
                strArray = new String [] {"1"};
            }else if(i == 4) {
                strArray = new String [] {"1"};
            }

            executorService.execute(new ESThread(esTest,strArray,i));

        }

        }catch(Exception e) {

        }

    }

    public class ESThread implements Runnable {
        private final Object lock = new Object();
        ESTest esTester = null;
        String strArr [] = null;
        int i =0;
        public ESThread(ESTest esTester,String[] strArr,int i)  {
            this.esTester = esTester;
            this.strArr = strArr;
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println("Name of Current thread is Thread_"+i);
            synchronized(lock) {
                esTester.startCRUDProcess(strArr);
            }
            System.out.println("Thread_"+i+" done.");
        }
    }

    public void startCRUDProcess(String [] strArr) {

        SearchAPI esSearch = new SearchAPIImpl();
        boolean caseFlg = false;
        String _indexName = "gcindex";
        String _indexType = "gctype";
        String _ids = "501,602,702#@#1,10000,10001";
        String _id = "10000";
        String[] _strIds = new String[] {"10000","9999"};

        System.out.println("Insert Multiple Process is started...");
        System.out.println("--------------------------------------");
        try {
            caseFlg = insertMultipleDocument(dataPath,client,_indexName,_indexType);
        } catch (IOException | ParseException e) {
            e.printStackTrace();
            caseFlg = false;
        }

    }

    public synchronized boolean insertMultipleDocument(String dataPath,TransportClient client,String _indexName,String _indexType) throws FileNotFoundException, ParseException {

        try {

            JSONParser parser = new JSONParser();
            // we know we get an array from the example data
            JSONArray jsonArray = (JSONArray) parser.parse( new FileReader( dataPath ) );

            BulkRequestBuilder bulkDocument = client.prepareBulk();

            @SuppressWarnings("unchecked")
            Iterator<JSONObject> it = jsonArray.iterator();
            int i = 0;

            i = _getTotalHits(client,_indexName,_indexType);

            System.out.println("Total number of hits inside index = "+_indexName+" of type = "+_indexType+" are : "+i);
            System.out.println("-------------------------------------------------------------------------------------");
            while( it.hasNext() ) {
                i++;
                JSONObject json = it.next();
                System.out.println("Insert document for "+i+": " + json.toJSONString() );   

                // either use client#prepare, or use Requests# to directly build index/delete requests
                bulkDocument.add(client.prepareIndex(_indexName, _indexType, i+"")
                        .setSource(json.toJSONString(), XContentType.JSON )
                        );
            }

            BulkResponse bulkResponse = bulkDocument.get();
            if (bulkResponse.hasFailures()) {
                System.out.println("process failures by iterating through each bulk response item : "+bulkResponse.buildFailureMessage());
                return false;
            } else {
                System.out.println("All Documents inserted successfully...");
                /*if(bulkResponse.getItems()!=null) {
                    for(BulkItemResponse response:bulkResponse.getItems()) {
                        System.out.println(response.toString());
                        System.out.println(response.getResponse());
                    }
                }*/
                return true;
            }

        } catch (IOException ex) {
            System.out.println("Exception occurred while get Multiple Document : " + ex/*, ex*/);
            return false;
        }
    }   

    public synchronized int _getTotalHits(TransportClient client,String _indexName,String _indexType){
        SearchHits hits = null;
        int recCount = 0;
        long totalCount = 0;
        try {
            SearchResponse seacrhResponse = client.prepareSearch(_indexName)
                    .setTypes(_indexType)
                    .setSearchType(SearchType.QUERY_THEN_FETCH)
                    .get();

            if (seacrhResponse != null) {
                hits = seacrhResponse.getHits();
                totalCount = hits.getTotalHits();
                System.out.println("count = "+totalCount);
            }

            recCount = Integer.parseInt(totalCount+"");

        }catch(Exception ex) {
            System.out.println("Exception occurred while search Index : " + ex/*, ex*/);
        }

        return recCount;
    }    


}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...