Как добавить 1 миллиард строк в Impala? - PullRequest
0 голосов
/ 10 июня 2019

Я изучаю Impala для POC, моя задача - добавить 1 миллиард строк и проверить, насколько быстро мы можем вставлять и извлекать данные.Я создал таблицу, в которой 30 строк, половина из которых - строка, 2 - временные метки, а остальные - целые числа.Добавление 1 миллиона строк заняло около 3 часов, что очень медленно для производства.

Я написал небольшой Java-код, куда вставляю данные в пакетном выражении по 1000, код очень медленный и занял почти 13 часовпросто добавьте 4 миллиона строк.


    private static Connection connectViaDS() throws Exception {
        Connection connection = null;

        Class.forName("com.cloudera.impala.jdbc41.Driver");

        connection = DriverManager.getConnection(CONNECTION_URL);

        return connection;

    }

    private static void writeInABatchWithCompiledQuery(int records) {

        int protocol_no = 233,s_port=20,d_port=34,packet=46,volume=58,duration=39,pps=76,
                bps=65,bpp=89,i_vol=465,e_vol=345,i_pkt=5,e_pkt=54,s_i_ix=654,d_i_ix=444,_time=1000,flow=989;

        String s_city = "Mumbai",s_country = "India", s_latt = "12.165.34c", s_long = "39.56.32d",
                s_host="motadata",d_latt="29.25.43c",d_long="49.15.26c",d_city="Damouli",d_country="Nepal";

        long e_date= 1275822966, e_time= 1370517366;

        PreparedStatement preparedStatement;

       // int total = 1000000*1000;

        int total = 1000000*1000;

        int counter =0;

        Connection connection = null;
        try {
            connection = connectViaDS();



            preparedStatement = connection.prepareStatement(sqlCompiledQuery);


            Timestamp ed = new Timestamp(e_date);
            Timestamp et = new Timestamp(e_time);

            while(counter <total) {

                for (int index = 1; index <= 1000; index++) {

                    counter++;

                    preparedStatement.setString(1, "s_ip" + String.valueOf(index));
                    //   System.out.println(1);
                    preparedStatement.setString(2, "d_ip" + String.valueOf(index));
                    //   System.out.println(2);
                    preparedStatement.setInt(3, protocol_no + index);
                    //   System.out.println(3);
                    preparedStatement.setInt(4, s_port + index);
                    //  System.out.println(4);
                    preparedStatement.setInt(5, d_port + index);
                    //  System.out.println(5);
                    preparedStatement.setInt(6, packet + index);
                    //  System.out.println(6);
                    preparedStatement.setInt(7, volume + index);
                    //  System.out.println(7);
                    preparedStatement.setInt(8, duration + index);
                    //   System.out.println(8);
                    preparedStatement.setInt(9, pps + index);
                    //  System.out.println(9);
                    preparedStatement.setInt(10, bps + index);
                    //  System.out.println(10);
                    preparedStatement.setInt(11, bpp + index);
                    //   System.out.println(11);
                    preparedStatement.setString(12, s_latt + String.valueOf(index));
                    //  System.out.println(12);
                    preparedStatement.setString(13, s_long + String.valueOf(index));
                    //  System.out.println(13);
                    preparedStatement.setString(14, s_city + String.valueOf(index));
                    //  System.out.println(14);
                    preparedStatement.setString(15, s_country + String.valueOf(index));
                    //  System.out.println(15);
                    preparedStatement.setString(16, d_latt + String.valueOf(index));
                    //   System.out.println(16);
                    preparedStatement.setString(17, d_long + String.valueOf(index));
                    //   System.out.println(17);
                    preparedStatement.setString(18, d_city + String.valueOf(index));
                    //   System.out.println(18);
                    preparedStatement.setString(19, d_country + String.valueOf(index));
                    //   System.out.println(19);
                    preparedStatement.setInt(20, i_vol + index);
                    //   System.out.println(20);
                    preparedStatement.setInt(21, e_vol + index);
                    //   System.out.println(21);
                    preparedStatement.setInt(22, i_pkt + index);
                    //   System.out.println(22);
                    preparedStatement.setInt(23, e_pkt + index);
                    //   System.out.println(23);
                    preparedStatement.setInt(24, s_i_ix + index);
                    //   System.out.println(24);
                    preparedStatement.setInt(25, d_i_ix + index);
                    //    System.out.println(25);
                    preparedStatement.setString(26, s_host + String.valueOf(index));
                    //   System.out.println(26);
                    preparedStatement.setTimestamp(27, ed);
                    //   System.out.println(27);
                    preparedStatement.setTimestamp(28, et);
                    //   System.out.println(28);
                    preparedStatement.setInt(29, _time);
                    //   System.out.println(29);
                    preparedStatement.setInt(30, flow + index);
                    //   System.out.println(30);
                    // System.out.println(index);
                    preparedStatement.addBatch();

                }
                preparedStatement.executeBatch();

                preparedStatement.clearBatch();

                //System.out.println("Counter = "+counter);


            }


        //    long start = System.currentTimeMillis();
        //    int[] inserted = preparedStatement.executeBatch();
         //   long end = System.currentTimeMillis();

           // System.out.println("total time taken to insert the batch = " + (end - start) + " ms");
            //System.out.println("total time taken = " + (end - start)/records + " s");
            //System.out.println("Inserted: "+inserted);



        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

    }

Данные обновляются со скоростью улитки, я попытался увеличить размер пакета, но при этом он уменьшил скорость.Я не знаю, неверен ли мой код, или мне нужно настроить Impala для лучшей производительности.Пожалуйста, руководство.

Я использую ВМ для тестирования, вот другие подробности -

System.

Os - Ubuntu 16
RAM - 12 gb
Cloudera - CDH 6.2
Impala daemon limit - 2 gb
Java heap size impala daemon - 500mb
HDFS Java Heap Size of NameNode in Bytes - 500mb.

Пожалуйста, дайте мне знать, если потребуется более подробная информация.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...