Ниже приведена простая программа, которая читает данные из MySQL и сохраняет их в файле CSV.Если запрос возвращает более 10 миллионов записей, он будет медленным.
Я полностью понимаю, что для параллельной работы нам нужно выполнить такой процесс, как
- Получить количество записей из запроса (выберите * из пользователей)
- Затем разбейте запрос на параллельный блок с соответствующим (выберите * из пользователей, где состояние = 'CA')
- Затем данные можно читать параллельно в 50 потоках илираспределены по процессу.
В Apache spark используется partition_column с нижним верхним пределом и номером раздела, как показано ниже.
Мне интересно узнать, существует ли способ / шаблон /алгоритм, который можно использовать в приложении Non-Spark для параллельного получения огромных данных.Однако я буду смотреть на код Spark для реализации ниже.
https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3
spark.read("jdbc")
.option("url", url)
.option("dbtable", "pets")
.option("user", user)
.option("password", password)
.option("numPartitions", 10)
.option("partitionColumn", "owner_id")
.option("lowerBound", 1)
.option("upperBound", 10000)
.load()
SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000
Простой код MySQL для чтения и хранения данных в CSV-файле
public static void main(String[] args)
{
try
{
String myDriver = "org.gjt.mm.mysql.Driver";
String myUrl = "jdbc:mysql://localhost/test";
Class.forName(myDriver);
Connection conn = DriverManager.getConnection(myUrl, "root", "");
String query = "SELECT * FROM users";
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(query);
StringBuilder sb = new StringBuilder();
while (rs.next())
{
int id = rs.getInt("id");
String firstName = rs.getString("first_name");
String lastName = rs.getString("last_name");
Date dateCreated = rs.getDate("date_created");
boolean isAdmin = rs.getBoolean("is_admin");
int numPoints = rs.getInt("num_points");
sb.append(String.format("%s, %s, %s, %s, %s, %s\n", id, firstName, lastName, dateCreated, isAdmin, numPoints));
}
try (FileOutputStream oS = new FileOutputStream(new File("aFile.csv"))) {
oS.write(sb.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
st.close();
}
catch (Exception e)
{
System.err.println("Got an exception! ");
System.err.println(e.getMessage());
}
}