Многопоточность с использованием службы Executor обрабатывает последовательно, а не одновременно - PullRequest
0 голосов
/ 31 мая 2018

У меня xml-запросы, сгенерированные в одной таблице.Мне нужно вызвать внешний веб-сервис, читая запросы из таблицы.Я хочу отправить несколько запросов одновременно, используя многопоточную службу исполнителя.Код вызывает веб-сервис последовательно вместо параллельной обработки.Любые предложения будут полезны.

Ниже приведен код:

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.io.input.ReaderInputStream;

import javax.xml.soap.*;

public class SoapClientSVAPI2 {

    public static void main(String args[]) {

        SOAPConnectionFactory soapConnectionFactory= null;
        SOAPConnection soapConnection = null;
        Connection conn= null;
        Statement s = null;
        PreparedStatement preparedStatement = null;
        ExecutorService executor = Executors.newFixedThreadPool(10);
        String soapEndpointUrl = "XXX";
        String reqid;
        Clob payload= null;


        try {

        Class.forName("oracle.jdbc.driver.OracleDriver");

        conn =  DriverManager.getConnection
              ("jdbc:oracle:thin:@XXX:1521:XXX", "XXX", "XX");
         s = conn.createStatement();

         // Create SOAP Connection
         soapConnectionFactory = SOAPConnectionFactory.newInstance();
         soapConnection = soapConnectionFactory.createConnection();

       String migTable = "SELECT req_id,payload,attr_9 FROM tableX WHERE ATTR_9 = \'READY\' ORDER BY dbms_random.value";
       s.execute(migTable);

       String updateTableSQL = "UPDATE tableX SET RESPONSE = ? , attr_9 = ? WHERE REQ_ID = ?";
       preparedStatement = conn.prepareStatement(updateTableSQL);

       ResultSet rs = s.getResultSet();

       while((rs!=null) && (rs.next()))
       {
           payload = rs.getClob(2);
           reqid = rs.getString(1);

         final class CallWebService implements Callable<String[]> {

               String soapEndpointUrl = "xxx";
               String reqid;
               Connection conn1 = null;
               PreparedStatement prepstmt = null;
               SOAPConnection soapConn = null;
               Clob payload= null;

                  public CallWebService( String reqid, Clob payload, Connection conn1,PreparedStatement prepstmt,  SOAPConnection soapConn ) {
                       this.reqid = reqid;
                       this.payload = payload;
                       this.conn1 = conn1;
                       this.prepstmt = prepstmt;
                       this.soapConn = soapConn;
                   }

                  @Override
               public String[] call() throws Exception {

                ByteArrayOutputStream baos = null;
                String svresponse = "";

                try {

                     String threadName = Thread.currentThread().getName();

                     System.out.println("Reqid: "+reqid+ "Thread name:"+ threadName);

                       Reader ir = new BufferedReader(payload.getCharacterStream());
                       InputStream is = new ReaderInputStream(ir,"UTF-8");
                       SOAPMessage inpRequest = MessageFactory.newInstance().createMessage(null, is);


                       SOAPMessage soapResponse = soapConn.call(inpRequest, soapEndpointUrl);

                       if (soapResponse != null)
                       {
                                baos = new ByteArrayOutputStream();
                                soapResponse.writeTo(baos); 
                                svresponse = baos.toString();
                       }

                     }

                   catch (Exception e) {
                        e.printStackTrace();
                       svresponse = "ERROR: " + e.getMessage();
                   }

                  finally 
                     {
                         if (baos != null) 
                         {
                             try 
                             {
                                 baos.close();
                             } 
                             catch (IOException ioe) 
                             {

                                ioe.printStackTrace();
                             }
                          }
                       }

                     return new String[] { reqid, svresponse};
              }
         };
     //calling webservice      
     if(reqid != null & !reqid.equals(""))
     {
           CallWebService cweb = new CallWebService(reqid,payload,conn,preparedStatement,soapConnection);
           Future<String[]> future = executor.submit(cweb);
           String response = "";
           String reqidout = "";
           String status = "FAILURE";
           try {
               // Wait 10s for response.
               reqidout = future.get(10, TimeUnit.SECONDS)[0];
               response = future.get(10, TimeUnit.SECONDS)[1];

           } catch (InterruptedException e) {
               e.printStackTrace();
               response = "InterruptedException";
               future.cancel(true);
           } catch (ExecutionException e) {
               e.printStackTrace();
               response = "ExecutionException";
               future.cancel(true);
           } catch (TimeoutException e) {
               e.printStackTrace();
               response = "TimeoutException";
               future.cancel(true);
           }

            if(response != null && response.indexOf("ReturnCode") > 0)
            {
                status = "SUCCESS";
            }

           preparedStatement.setString(1, response);
           preparedStatement.setString(2, status);
           preparedStatement.setString(3, reqidout);

           // execute update SQL statement
           preparedStatement.executeUpdate();

         }
       }

    }  

        catch (Exception e) {
            System.out.println("\nError occurred while sending SOAP Request to Server!\n");
            e.printStackTrace();
        }

        finally {

            System.out.println("Time end"+new Date());

          executor.shutdown();
          try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

            if (soapConnection != null && conn != null)
            {
                try {
                    soapConnection.close();
                    s.close();
                    conn.close();
                } catch (SOAPException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }

} 

1 Ответ

0 голосов
/ 31 мая 2018

это потому, что вы отправляете задание и заставляете основной поток ждать 10 секунд времени ожидания перед отправкой следующего задания.

Используйте future.get () после завершения отправки всех ваших работ.

Мои 2 цента, я также настоятельно рекомендую рефакторинг вашего кода, чтобы иметь методы с одной ответственностью с меньшей связью (только необходимые параметры метода).Например, разделить чтение из БД, вызвать веб-сервис и вставить ответ в БД и т. Д. Как отдельные методы.Это должно позволить вам лучше находить и решать такие проблемы секвенирования.

...