Как синхронизировать c очень большую postgres таблицу базы данных с веб-программой, используя вызов REST API? - PullRequest
0 голосов
/ 25 марта 2020

Вот несколько вещей, которые вы должны знать:

  1. У меня есть postgres база данных, в которой есть таблица 'account', которая содержит около миллиона записей.
  2. У меня есть учетная запись Vbout, где мне нужно синхронизировать c записи из таблицы «учетная запись» как контакты, используя Vbout Rest API (https://developers.vbout.com/docs/1_0/#emailmarketing_addcontact).
  3. Я создал python скрипт для генерации случайных данных и заполнения строк в таблице postgres для тестирования.

Я написал Java программу для этого syn c:

  1. Моя java программа сначала подсчитывает количество записей в базе данных, а затем делит записи на диапазоны (скажем, есть 100 записей, и я хочу запустить 5 потоков, диапазоны 1-20, 21-40, 41- 60, 61-80, 81-100) и обрабатывает каждый диапазон с использованием отдельного потока, чтобы ускорить процесс.
  2. Каждый поток извлекает запись и создает объект для записи и помещает ее в Очередь, которую я реализовал, используя Arraylist.

  3. У меня есть e 100 Темы для публикации данных в Vbout. Эти потоки извлекают объект из очереди и делают api-вызов rest vbout, чтобы создать там контакт.

Вот проблема, с которой я сталкиваюсь:

  1. Программа работает нормально для 10000 записей, но не для миллиона записей.
  2. Когда я заполняю таблицу миллионом записей, PgAdmin и моя программа аварийно завершают работу.
  3. При миллионов записей, моя программа показывает ошибку соединения, а PgAdmin показывает ошибку потери соединения. Я не могу решить это.
    import org.apache.hc.client5.http.classic.HttpClient;
    import org.apache.hc.client5.http.classic.methods.HttpPost;
    import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
    import org.apache.hc.client5.http.impl.classic.HttpClients;
    import org.apache.hc.core5.http.HttpResponse;
    import org.apache.hc.core5.http.NameValuePair;
    import org.apache.hc.core5.http.message.BasicNameValuePair;
    
    import java.sql.*;
    import java.util.ArrayList;
    import java.util.List;
    
    
    //DB Controller Class used to get Database Connection
    //Have made it generic for future use
    class DBController {
    
        private static final String dbName = "dbname";
        private static final String username = "postgres";
        private static final String password = "password";
        private static final String host = "localhost";
    
    
        public static Connection getDBConnection() {
            try {
                Class.forName("org.postgresql.Driver");
                Connection connection = DriverManager
                        .getConnection("jdbc:postgresql://" + host + "/" + dbName,
                                username, password);
    
    
                return connection;
            } catch (Exception e) {
                System.out.println("Connection To Database failed due to following reasons:");
                e.printStackTrace();
            }
    
            return null;
        }
    
    }
    
    
    /*
     * Range Objects are used to divide all the records of the table
     * into parts. Then all the parts are processed by different threads to make the sync faster.
     * */
    class Range {
        int from;
        int to;
    }
    
    //Vbout Contact Objects store the property for posting into the Vbout API
    class VboutContact {
        String firstName;
        String lastName;
        String email;
        String phone;
    }
    
    
    //This is the main class and this should be run only once (THE FIRST TIME) to do an initial sync
    public class FirstSync {
    
       static int count=0;
        static int testCount=0;
    
        private static final String listId = "######";
        private static final String vboutApiKey = "#######";
    
        //Default Vbout Threads
        private static final int vboutThreads = 500;
    
        //Default Database Threads (eg. 5 Threads = 5 Range Objects)
        private static final int dbThreads = 100;
    
        private static int countRecords = 0;
    
    
        //We maintain a queue of contacts that every thread add the records into
        private static volatile ArrayList<VboutContact> contactQueue = new ArrayList<VboutContact>();
    
    
        public static void main(String args[]) {
    
            System.out.println("Connecting to database...");
    
    
            System.out.println("Connected to database!");
    
    
            countRecords = countRecords();
            doSync();
    
    
            Runnable r1 = new Runnable() {
                @Override
                public void run() {
                    doPostVbout();
                }
            };
    
            //Runs vboutThreads number of threads for posting data into vbout default (3)
            for (int i = 0; i < vboutThreads; i++) {
                new Thread(r1).start();
            }
    
        }
    
        public static void doPostVbout() {
            while (true) {
                try {
                    if (contactQueue.size() > 0) {
                        VboutContact vboutContact = null;
                        synchronized (contactQueue) {
                            vboutContact = contactQueue.remove(0);
                        }
                        System.out.println("Processing Vbout Contact : " + vboutContact.email+" - Name : "+vboutContact.firstName+" "+vboutContact.lastName+" Phone : "+vboutContact.phone);
                        HttpClient httpclient = HttpClients.createDefault();
                        HttpPost httppost = new HttpPost("https://api.vbout.com/1/emailmarketing/addcontact.json?key=" + vboutApiKey);
                        List<NameValuePair> params = new ArrayList<NameValuePair>();
                        params.add(new BasicNameValuePair("email", vboutContact.email));
                        params.add(new BasicNameValuePair("status", "Active"));
                        params.add(new BasicNameValuePair("listid", listId));
                        params.add(new BasicNameValuePair("fields[161784]", vboutContact.firstName));
                        params.add(new BasicNameValuePair("fields[161785]", vboutContact.lastName));
                        params.add(new BasicNameValuePair("fields[161787]", vboutContact.phone));
                        httppost.setEntity(new UrlEncodedFormEntity(params));
                        HttpResponse response = httpclient.execute(httppost);
                        System.out.println(response.toString());
                        count++;
                        System.out.println("\n\n\nProgress : "+count+" / "+countRecords);
                    }
                } catch (Exception e) {
                }
                System.out.println("Total Count Records: "+countRecords+" - Total Fetched Records"+testCount);
            }
        }
    
        public static boolean doSync() {
            boolean syncResultBool = false;
            try {
    
                //Counts Total Reords in the database
                int count = countRecords;
    
                //Breaks the total number of records into ranges
                ArrayList<Range> ranges = getRanges(count);
    
                //Runs Thread of each range
                for (int i = 0; i < ranges.size(); i++) {
                    Range thisRange = ranges.get(i);
                    Runnable r1 = new Runnable() {
                        @Override
                        public void run() {
                            doSyncThread(thisRange);
                        }
                    };
                    Thread t1 = new Thread(r1);
                    t1.start();
                }
            } catch (Exception e) {
                System.out.println("Sync Crashed Due To Following Issues");
            }
            return syncResultBool;
        }
    
    
        public static void doSyncThread(Range range) {
            try {
                System.out.println("Sync Running for " + range.from + " : " + range.to);
                Connection connection=DBController.getDBConnection();
                PreparedStatement preparedStatement = connection.prepareStatement("SELECT id,firstname,lastname,phone,personemail FROM public.account LIMIT ? OFFSET ?");
                preparedStatement.setInt(1, (range.to - range.from)+1);
                System.out.println("\n\n\n\n\nLIMIT "+(range.to - range.from+1)+"OFFSET "+(range.from-1)+"\n\n\n");
                preparedStatement.setInt(2, range.from-1);
    
                ResultSet rs = preparedStatement.executeQuery();
                while (rs.next()) {
    
                    VboutContact vboutContact = new VboutContact();
                    vboutContact.firstName = rs.getString(2);
                    vboutContact.lastName = rs.getString(3);
                    //vboutContact.phone = rs.getString(4);
                    vboutContact.phone = "9999999999";
                    vboutContact.email = rs.getString(5).toLowerCase()+"@example.com";
    
                    System.out.println("Adding Record to Queue : " + vboutContact.email);
                    //Adds records to queue
                    synchronized (contactQueue) {
                        contactQueue.add(vboutContact);
                        testCount++;
                    }
                }
                rs.close();
                preparedStatement.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Sync Crashed Due To Following Issues");
                e.printStackTrace();
            }
        }
    
        public static int countRecords() {
            int count = 0;
            try {
    
                Connection connection=DBController.getDBConnection();
                // make sure autocommit is off
                connection.setAutoCommit(false);
                Statement st = connection.createStatement();
                st.setFetchSize(100);
                ResultSet rs = st.executeQuery("select count(id) from account;");
                while (rs.next()) {
                    count = rs.getInt(1);
                    System.out.print("a row was returned.");
                }
                rs.close();
                st.close();
                connection.close();
    
            } catch (Exception e) {
                System.out.println("Sync Crashed Due To Following Issues");
                e.printStackTrace();
            }
            System.out.println("Total Records : " + count);
            countRecords=count;
            return count;
        }
    
        public static ArrayList<Range> getRanges(int count) {
            ArrayList<Range> arrayList = new ArrayList<Range>();
            int threads = dbThreads;
            int parts = count / threads;
            float partf = ((float) count / (float) threads) - (float) parts;
            int temp = Math.round(partf * threads);
            int d = parts;
            int t = 0;
            for (int i = 2; i <= threads + 1; i++) {
                Range range = new Range();
                range.from = t + 1;
                t = (i - 1) * d;
                range.to = t;
                range.to = (i == threads + 1) ? t + temp : t;
                arrayList.add(range);
            }
            return arrayList;
        }
    
    }
    
    

1 Ответ

0 голосов
/ 25 марта 2020

Миллион записей даже не «большой» для PostgreSQL, а тем более «очень большой». Использование 100 потоков для чтения миллиона записей из базы данных - это проблема, а не решение. Что заставило вас поверить, что это было бы полезно? Возможно, вы можете полезно использовать потоки для vbout (я бы не знал), но это не значит, что вам также нужно использовать их для PostgreSQL.

Вы говорите, что PgAdmin потерпел крах, но вы не описали любое использование PgAdmin. Что ты делал с этим? Каково было точное сообщение об ошибке?

Вы говорите, что ваша java программа потерпела крах. Какое именно сообщение об ошибке было выдано PostgreSQL?

Хороший шаг для отладки вашего кода - прочитать ваши сообщения об ошибках . Если вам нужна помощь в их интерпретации, вам нужно показать нам ваши сообщения об ошибках .

...