Как создать общее соединение JDB C для использования на исполнителях? - PullRequest
0 голосов
/ 14 января 2020

Я создал одноэлементное соединение spark jdb c в драйвере и планирую использовать соединение в исполнителях. Я получаю ниже исключения. org. apache .spark.SparkException: задача не сериализуема

Внутри основного класса искры:

object ExecutorConnection {
      private var connection: Connection = null
      val url = prop.getProperty("url")
      val user = prop.getProperty("user")
      val pwd = prop.getProperty("password")
      val driver = prop.getProperty("driver")
      Class.forName(driver)
     def getConnection(url: String, username: String, password: String): Connection = synchronized {
        if (connection == null) {
          connection = DriverManager.getConnection(url, username, password)
          Class.forName(driver)
          connection.setAutoCommit(false)
        }
        connection
      }
      lazy val createConnection = getConnection(url, user, pwd)
    }

У меня есть несколько фреймов данных (df1, df2, df3) с другой схемой, где im Планирование создания соединения на уровне драйвера, сериализации соединения и использования его для всех информационных фреймов.

  df1.rdd.repartition(2).mapPartitions((d) => Iterator(d)).foreach { partition =>    
    val conn = ExecutorConnection.createConnection 
    var ps: PreparedStatement = null
    partition.grouped(1).foreach(batch => {
      batch.foreach { x =>
        {
          ps = conn.prepareStatement(SqlString)
          ps.addBatch()
          conn.commit()
        }
      }
    })
  }

Ответы [ 2 ]

2 голосов
/ 14 января 2020

Использование Набор данных.foreachPartition :

foreachPartition (f: (Iterator [T]) ⇒ Единица измерения): Единица измерения

Применяет функцию f к каждому разделу этого набора данных.

Этот трюк с объектом Scala - именно то, как вы получаете соединение один раз за задачу (и я думаю, что для каждого исполнителя).

df1.foreachPartition { vs =>
  // use the connection here
}

Используйте Гуава для кэша.

0 голосов
/ 15 января 2020

Re:

, где я планирую создать соединение на уровне драйвера и сериализовать соединение

Это не работает таким образом.

Вы должны создать соединения на исполнителя, иначе вы будете получать это исключение.

...