Загрузите ResultSet запроса в фрейм данных, используя Spark / java - PullRequest
0 голосов
/ 09 июля 2020

Я хочу загрузить результирующий набор запроса на выборку в Dataframe Spark.

Я использую следующий код:

public static void func (Dataset <Row> df){
    df.repartition(20); //one connection per partition, see below

    df.foreachPartition((Iterator<Row> t) -> {
        Connection conn = DriverManager.getConnection("url",
                "root", "");

        conn.setAutoCommit(true);
        Statement statement = conn.createStatement();

        final int batchSize = 100000;
        int i = 0;
        while (t.hasNext()) {
            Row row = t.next();
            try {

              ResultSet query =   statement.executeQuery("SELECT * FROM zones WHERE zones.id IN ("
                        +"'"  + row.getAs("idZones")
                        + "'"+ ")  ");

     

            }  catch (SQLException e) {
                e.printStackTrace();
            } finally {
              
            }
        }

        statement.close();
        conn.close();


    });

}

Есть возможность загрузить ResultSet в фрейм данных?

Мне нужна ваша помощь

Спасибо.

1 Ответ

0 голосов
/ 09 июля 2020

Если я правильно понял ваш вопрос, вы хотите загрузить таблицу SQL во фрейм данных. Для этого вам необходимо сделать следующее:

  1. Создать объект sparkSession.
  2. Поместить ваше соединение JDB C в объект Properties.
  3. Загрузить таблицу SQL методом чтения.
  4. Вы можете применить соответствующий фильтр на основе загруженного фрейма данных.

Пожалуйста, найдите ниже код как образец для этого.

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

import java.util.Properties;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ReadFromSQLTable {
    public static void main(String[] args) {
        String applicationName = ReadFromSQLTable.class.getName();
        SparkConf sparkConf = new SparkConf().setAppName(applicationName).setMaster("local[2]");
        // using Dataset<Row>
        SparkSession sparkSession = SparkSession
                .builder()
                .config(sparkConf)
                .getOrCreate();


        Properties connectionProperties = new Properties();

        connectionProperties.put("user", "root"); // user name of your SQL database
        connectionProperties.put("password", "password"); // password of SQL
        connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
// Name of the database that i am interacting with is `test`. You will find this as part of URL.
// Name of table that I want to load is the `employee`
        Dataset<Row> employeeDetail = sparkSession.read().jdbc("jdbc:mysql://127.0.0.1:3306/test",
                "employee", connectionProperties);

        log.error("Printing table detail");
        employeeDetail.show(); // to show the dataset loaded on the console
        long count = employeeDetail.count();
        System.out.println("The count is = " + count);
        Dataset<Row> employeeDetail2 = employeeDetail.filter("employee_number < 2");
        employeeDetail2.show();
}

Вы можете применить к этому фрейму данных любую операцию, такую ​​как фильтр, выбор или любую другую SQL операцию.

Я запускаю этот код в моей локальной системе. Я добавил комментарий в код, чтобы вы могли легко его понять. Дайте мне знать, если у вас есть какие-либо сомнения.

Надеюсь, это должно дать вам четкое представление о том, как начать загрузку таблицы SQL как фрейма данных.

...