Могу ли я пометить POJO как объект Hibernate, в то время как этот POJO используется Apache Spark? - PullRequest
0 голосов
/ 22 марта 2019

В проекте, с которым я сейчас работаю, есть несколько POJO-файлов, которые используются Spark следующим образом:

    JavaRDD<MyPojo> = ...
    sqlContext.createDataFrame(rdd, MyPojo.class);

Однако мне также необходимо загрузить таблицу MySQL в этот POJO, используя обычный код Java.Я могу использовать собственный SQL для загрузки и переноса данных с помощью POJO.Но мне также интересно, можно ли мне пометить этот POJO как Hibernate Entity.

Заранее спасибо.

1 Ответ

0 голосов
/ 22 марта 2019

Независимо от того, используете ли вы RDD или набор данных / фрейм данных POJO, аннотации Hibernate / JPA не должны мешать.

Однако будьте осторожны с тем, где работает ваш код, когда вы "играете" с Spark.

Если вы хотите, чтобы ваш исполнитель общался с базой данных через Hibernate, тогда ему придется открыть сеанс Hibernate.Я не знаю (или не думаю), что сеанс Hibernate может быть сериализуемым и использоваться совместно вашим драйвером и исполнителем.

Если у вас есть сеанс Hibernate в вашем драйвере и вы хотите сохранить данные на этом этапе,помните, что вам придется передавать все данные от исполнителей к драйверу (например, с помощью collect() -подобного метода).

Это может немного изменить архитектуру вашего приложения, но я бы подумалвызов write():

df.write()
    .mode(SaveMode.Overwrite)
    .jdbc(dbConnectionUrl, "ch02", prop);

Не то, что это делается с кадром данных, который является Dataset<Row>, а не Dataset<MyPojo> (и не RDD).Полный пример:

package net.jgp.books.spark.ch02.lab100_csv_to_db;

import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;

import java.util.Properties;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

/**
 * CSV to a relational database.
 * 
 * @author jgp
 */
public class CsvToRelationalDatabaseApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    CsvToRelationalDatabaseApp app = new CsvToRelationalDatabaseApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("CSV to DB")
        .master("local")
        .getOrCreate();

    // Step 1: Ingestion
    // ---------

    // Reads a CSV file with header, called authors.csv, stores it in a
    // dataframe
    Dataset<Row> df = spark.read()
        .format("csv")
        .option("header", "true")
        .load("data/authors.csv");

    // Step 2: Transform
    // ---------

    // Creates a new column called "name" as the concatenation of lname, a
    // virtual column containing ", " and the fname column
    df = df.withColumn(
        "name",
        concat(df.col("lname"), lit(", "), df.col("fname")));

    // Step 3: Save
    // ----

    // The connection URL, assuming your PostgreSQL instance runs locally on the
    // default port, and the database we use is "spark_labs"
    String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs";

    // Properties to connect to the database, the JDBC driver is part of our
    // pom.xml
    Properties prop = new Properties();
    prop.setProperty("driver", "org.postgresql.Driver");
    prop.setProperty("user", "jgp");
    prop.setProperty("password", "Spark<3Java");

    // Write in a table called ch02
    df.write()
        .mode(SaveMode.Overwrite)
        .jdbc(dbConnectionUrl, "ch02", prop);

    System.out.println("Process complete");
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...