Стандартный алгоритм или шаблон для чтения больших данных параллельно с использованием JDBC в приложении Java - PullRequest
0 голосов
/ 10 мая 2018

Ниже приведена простая программа, которая читает данные из MySQL и сохраняет их в файле CSV.Если запрос возвращает более 10 миллионов записей, он будет медленным.

Я полностью понимаю, что для параллельной работы нам нужно выполнить такой процесс, как

  1. Получить количество записей из запроса (выберите * из пользователей)
  2. Затем разбейте запрос на параллельный блок с соответствующим (выберите * из пользователей, где состояние = 'CA')
  3. Затем данные можно читать параллельно в 50 потоках илираспределены по процессу.

В Apache spark используется partition_column с нижним верхним пределом и номером раздела, как показано ниже.

Мне интересно узнать, существует ли способ / шаблон /алгоритм, который можно использовать в приложении Non-Spark для параллельного получения огромных данных.Однако я буду смотреть на код Spark для реализации ниже.

https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3

spark.read("jdbc")
  .option("url", url)
  .option("dbtable", "pets")
  .option("user", user)
  .option("password", password)
  .option("numPartitions", 10)
  .option("partitionColumn", "owner_id")
  .option("lowerBound", 1)
  .option("upperBound", 10000)
  .load()

SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000

Простой код MySQL для чтения и хранения данных в CSV-файле

public static void main(String[] args)
{
    try
    {
        String myDriver = "org.gjt.mm.mysql.Driver";
        String myUrl = "jdbc:mysql://localhost/test";
        Class.forName(myDriver);
        Connection conn = DriverManager.getConnection(myUrl, "root", "");
        String query = "SELECT * FROM users";
        Statement st = conn.createStatement();
        ResultSet rs = st.executeQuery(query);
        StringBuilder sb = new StringBuilder();
        while (rs.next())
        {
            int id = rs.getInt("id");
            String firstName = rs.getString("first_name");
            String lastName = rs.getString("last_name");
            Date dateCreated = rs.getDate("date_created");
            boolean isAdmin = rs.getBoolean("is_admin");
            int numPoints = rs.getInt("num_points");
            sb.append(String.format("%s, %s, %s, %s, %s, %s\n", id, firstName, lastName, dateCreated, isAdmin, numPoints));
        }

        try (FileOutputStream oS = new FileOutputStream(new File("aFile.csv"))) {
            oS.write(sb.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
        st.close();
    }
    catch (Exception e)
    {
        System.err.println("Got an exception! ");
        System.err.println(e.getMessage());
    }
}

1 Ответ

0 голосов
/ 10 мая 2018

Это не совсем точно отвечает на ваш вопрос, но SELECT DATA INTO OUTFILE может помочь вам быстро экспортировать ваши данные.

Вот пример команды для создания файла CSV в вашем случае,

SELECT * 
  INTO OUTFILE '/some/path/to/users.csv'
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
 LINES TERMINATED BY '\n'
  FROM users;

Это использует быстрый путь для записи данных в вашу файловую систему и может быть быстрее, чем ваш многопоточный подход. конечно легче программировать.

Всегда полезно начинать такой запрос большого объема с SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;, чтобы избежать блокировки вставок и обновлений таблицы.

Если вы будете использовать несколько потоков Java для извлечения ваших данных, я предлагаю вам использовать эту стратегию:

  1. Перед порождением потоков определите наибольшее значение id, выполнив этот запрос: SELECT MAX(id) FROM users;

  2. Решите, сколько потоков вы будете создавать. Слишком много потоков будет контрпродуктивным, потому что они будут перегружать ваш сервер MySQL. Пятьдесят потоков - это далеко слишком много соединений с вашим сервером MySQL. Используйте четыре или восемь.

  3. Дайте каждому потоку свой сегмент id значений для извлечения. Например, если у вас есть десять миллионов строк и четыре потока, сегменты будут [1-2500000], [2500001-5000000], [5000001-7500000] и [7500001-10000000].

  4. В каждом потоке откройте соединение jdbc с MySQL и выполните WHERE id BETWEEN segmentstart AND segmentfinish, чтобы выбрать правильные строки. (Соединения MySQL не являются потокобезопасными объектами).

  5. Поставьте SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; перед вашими запросами SELECT.

id является (предположительно) первичным ключом таблицы users, поэтому фильтрация WHERE с его использованием будет очень эффективной.

...