Как добавить Spark Dataframe в пакет, используя PreparedStatement без итерации по строкам - PullRequest
0 голосов
/ 02 марта 2020

В настоящее время я использую приведенную ниже логику c для выполнения пакетной вставки для фрейма данных Spark, который я создаю после чтения потоковых данных в реальном времени из kafka topi c с использованием потоковых API Kafka Spark. Эти данные мне нужно загрузить в промежуточную таблицу DB2 в зависимости от размера пакета. Размер данных составляет тысячи транзакций в секунду, которые я потребляю из топи c.

Class DF_Creation{
.
DB2_CLASS.insert(DB2_Table, final_dataframe, batchSize);
.
}

Class DB2_CLASS{
.
public static void insert(String DB2_Table, Dataset<Row> final_dataframe, int batchSize){
CREATE DB2 Connection..Connection conn = ......
CREATE STATEMENT Statement stmt = conn.createStatement()) {
String truncate = "TRUNCATE TABLE DB2_Table IMMEDIATE";
stmt.execute(truncate);
final_dataframe.foreachPartition((ForeachPartitionFunction<Row>) rows -> {
       String insertQuery = "INSERT INTO " + DB2_Table + " (COL1,COL2,COL3) VALUES (?, ?, ?) ";
       try (Connection conn = CREATE DB2 Connection
             PreparedStatement insertStmt = conn.prepareStatement(insertQuery)) {
            conn.setAutoCommit(false);
            try{
                            int cnt = 0;
                            while (rows.hasNext()) {
                            int idx = 0;
                            Row row = rows.next();
                    insertStmt.setString(++idx, row.getAs("COL1"));
                    insertStmt.setString(++idx, row.getAs("COL2"));
                    insertStmt.setString(++idx, row.getAs("COL3"));
                        insertStmt.addBatch();
                                            cnt++;
                    if (cnt >= batchSize) {
                        insertStmt.executeBatch();
                        conn.commit();
                        insertStmt.clearBatch();
                        cnt = 0;
                    }
                        }}catch{..}
        }
    }
}}              

Это влияет на производительность задания спарка, так как я перебираю каждую из строк, читая каждую из столбцы, чтобы создать пакет. Есть ли способ создать пакет напрямую, без итераций по строкам и столбцам.

Пожалуйста, предложите.

Спасибо

1 Ответ

0 голосов
/ 03 марта 2020

Вы можете сразу записать свой фрейм данных в таблицу назначения через опцию JDB C в программе записи искровых фреймов. Больше нет необходимости создавать подготовленный оператор, перебирая весь кадр данных построчно. Spark обрабатывает все это внутренне.

Вы можете написать:

import com.ibm.db2.jcc._

val jdbcUrl = "jdbc:db2://host:port/database_name"

val db2Properties = new Properties()

final_dataframe.write.mode("append").option("driver", "com.ibm.db2.jcc.DB2Driver").jdbc(jdbcUrl, table = "table_name", db2Properties)
...