В настоящее время я использую приведенную ниже логику c для выполнения пакетной вставки для фрейма данных Spark, который я создаю после чтения потоковых данных в реальном времени из kafka topi c с использованием потоковых API Kafka Spark. Эти данные мне нужно загрузить в промежуточную таблицу DB2 в зависимости от размера пакета. Размер данных составляет тысячи транзакций в секунду, которые я потребляю из топи c.
Class DF_Creation{
.
DB2_CLASS.insert(DB2_Table, final_dataframe, batchSize);
.
}
Class DB2_CLASS{
.
public static void insert(String DB2_Table, Dataset<Row> final_dataframe, int batchSize){
CREATE DB2 Connection..Connection conn = ......
CREATE STATEMENT Statement stmt = conn.createStatement()) {
String truncate = "TRUNCATE TABLE DB2_Table IMMEDIATE";
stmt.execute(truncate);
final_dataframe.foreachPartition((ForeachPartitionFunction<Row>) rows -> {
String insertQuery = "INSERT INTO " + DB2_Table + " (COL1,COL2,COL3) VALUES (?, ?, ?) ";
try (Connection conn = CREATE DB2 Connection
PreparedStatement insertStmt = conn.prepareStatement(insertQuery)) {
conn.setAutoCommit(false);
try{
int cnt = 0;
while (rows.hasNext()) {
int idx = 0;
Row row = rows.next();
insertStmt.setString(++idx, row.getAs("COL1"));
insertStmt.setString(++idx, row.getAs("COL2"));
insertStmt.setString(++idx, row.getAs("COL3"));
insertStmt.addBatch();
cnt++;
if (cnt >= batchSize) {
insertStmt.executeBatch();
conn.commit();
insertStmt.clearBatch();
cnt = 0;
}
}}catch{..}
}
}
}}
Это влияет на производительность задания спарка, так как я перебираю каждую из строк, читая каждую из столбцы, чтобы создать пакет. Есть ли способ создать пакет напрямую, без итераций по строкам и столбцам.
Пожалуйста, предложите.
Спасибо