Подключение к Hive из Spark без использования "hive-site.xml" - PullRequest
0 голосов
/ 15 января 2019

Есть ли способ подключиться к Hive из Spark без использования "hive-site.xml"?

 SparkLauncher sl = new SparkLauncher(evnProps);
        sl.addSparkArg("--verbose");
        sl.addAppArgs(appArgs);
        sl.addFile(evnProps.get(KEY_YARN_CONF_DIR) + "/hive-site.xml");

Мы передаем "hive-site.xml" в SparkLauncher. Я хочу удалить зависимость от "hive-site.xml", введите код здесь.

1 Ответ

0 голосов
/ 15 января 2019

Spark SQL поддерживает чтение и запись данных, хранящихся в Apache Hive. Однако, поскольку Hive имеет большое количество зависимостей, эти зависимости не включены в дистрибутив Spark по умолчанию. Если в пути к классам можно найти зависимости Hive, Spark загрузит их автоматически. Обратите внимание, что эти зависимости Hive также должны присутствовать на всех рабочих узлах, поскольку для доступа к данным, хранящимся в Hive, им потребуется доступ к библиотекам сериализации и десериализации Hive (SerDes).

Настройка Hive выполняется путем размещения файла hive-site.xml, core-site.xml (для настройки безопасности) и hdfs-site.xml (для конфигурации HDFS) в файле conf /.

При работе с Hive необходимо создать экземпляр SparkSession с поддержкой Hive, в том числе с подключением к постоянному метасольве Hive, поддержкой уровней Hive и пользовательских функций Hive. Пользователи, у которых нет существующего развертывания Hive, могут по-прежнему включать поддержку Hive. Если hive-site.xml не настроен, контекст автоматически создает metastore_db в текущем каталоге и создает каталог, настроенный с помощью spark.sql.warehouse.dir, который по умолчанию соответствует каталогу spark-warehouse в текущем каталоге, который используется приложением Spark. запущен Обратите внимание, что свойство hive.metastore.warehouse.dir в hive-site.xml устарело с версии Spark 2.0.0. Вместо этого используйте spark.sql.warehouse.dir, чтобы указать расположение базы данных по умолчанию на складе. Вам может потребоваться предоставить права на запись пользователю, который запускает приложение Spark.

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
    Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
...