Spark, как читать из нескольких кластеров Elastic Search - PullRequest
0 голосов
/ 02 июля 2018

Мне нужно прочитать данные из двух разных кластеров Elastic Search. один для журналов и один для данных о продуктах, и я попытался указать sparkConf() при создании SparkSession, но, похоже, он работает только с первым созданным SparkSession

val config1 = new SparkConf().setAppName("test")
  .set("spark.driver.allowMultipleContexts", "true")
  .set("es.index.auto.create", "true")
  .set("es.nodes.discovery", "false")
  .set("es.nodes.wan.only", "true")
  .set("es.nodes.client.only", "false")
  .set("es.nodes", s"$esNode1:$esPort1")

val config2 = new SparkConf().setAppName("test")
  .set("spark.driver.allowMultipleContexts", "true")
  .set("es.index.auto.create", "true")
  .set("es.nodes.discovery", "false")
  .set("es.nodes.wan.only", "true")
  .set("es.nodes.client.only", "false")
  .set("es.nodes", s"$esNode2:$esPort2")

val session1 = SparkSession.builder.master('local').config(config1).getOrCreate()
val session2 = SparkSession.builder.master('local').config(config2).getOrCreate()

session1.read.format("org.elasticsearch.spark.sql").load(path)
session2.read.format("org.elasticsearch.spark.sql").load(path)

кажется, что spark не поддерживает несколько сессий с одним и тем же форматом, потому что я использую один и тот же SparkSession с Mysql (jdbc), и он работает хорошо. Есть ли альтернативный способ получения данных из нескольких кластеров ElasticSearch?

1 Ответ

0 голосов
/ 02 июля 2018

Создайте только один сеанс для каждого приложения Spark. Затем прочитайте 2 кадра данных следующим образом:

  val config = new SparkConf().setAppName("test")
    .set("spark.driver.allowMultipleContexts", "true")
    .set("es.index.auto.create", "true")
    .set("es.nodes.discovery", "false")
    .set("es.nodes.wan.only", "true")
    .set("es.nodes.client.only", "false")

  val session = SparkSession.builder.master("local").config(config).getOrCreate

  val df1 = session.read.format("org.elasticsearch.spark.sql")
    .option("es.nodes", s"$esNode1:$esPort1").load(path)

  val df2 = session.read.format("org.elasticsearch.spark.sql")
    .option("es.nodes", s"$esNode2:$esPort2").load(path)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...