Spark: как распараллелить последующую конкретную работу на каждом разделе dataframe - PullRequest
0 голосов
/ 01 октября 2018

Приложение My Spark выглядит следующим образом:

1) выполнить большой запрос с помощью Spark SQL в фрейме данных «dataDF»

2) foreach-раздел, участвующий в «dataDF»:

2.1) получить связанный «отфильтрованный» фрейм данных, чтобы иметь только данные, связанные с разделом

2.2) выполнить определенную работу с этим «отфильтрованным» фреймом данных и записать вывод

Код выглядит следующим образом:

val dataSQL = spark.sql("SELECT ...")
val dataDF = dataSQL.repartition($"partition")

for {
  row <- dataDF.dropDuplicates("partition").collect
} yield {

   val partition_str : String = row.getAs[String](0)
   val filtered = dataDF.filter($"partition" .equalTo( lit( partition_str ) ) )

   // ... on each partition, do work depending on the partition, and write result on HDFS

   // Example :

   if( partition_str == "category_A" ){

       // do group by, do pivot, do mean, ...
       val x = filtered
         .groupBy("column1","column2")
         ...

       // write final DF
       x.write.parquet("some/path")

   } else if( partition_str == "category_B" ) {

       // select specific field and apply calculation on it
       val y = filtered.select(...)

       // write final DF
       x.write.parquet("some/path")

   } else if ( ... ) {

      // other kind of calculation
      // write results

   } else {

      // other kind of calculation
      // write results

   }

}

Такой алгоритм успешно работает.SQL-запрос Spark полностью распределен.Однако конкретная работа, выполняемая с каждым результирующим разделом, выполняется последовательно, и результат является неэффективным, особенно потому, что каждая запись, связанная с разделом, выполняется последовательно.

В таком случае, какие способы заменить «на выход»"чем-то параллельно / асинхронно?

Спасибо

1 Ответ

0 голосов
/ 01 октября 2018
  1. Вы можете использовать foreachPartition при записи в хранилища данных вне области видимости Hadoop со специальной логикой, необходимой для этой конкретной среды.

  2. Остальное и т. Д.

  3. .par параллельные коллекции (Scala) - но это используется с осторожностью.Для чтения файлов и их предварительной обработки, в противном случае, возможно, считается рискованным.

  4. Потоки.

  5. Вам необходимо проверить, что вы делаете, и можно ли ссылаться на операции, использовать их в блоке foreachPartition и т. Д.Вы должны попробовать, так как некоторые аспекты могут быть написаны только для водителя, а затем распространены среди исполнителей через SPARK для рабочих.Но вы не можете написать, например, spark.sql для работника, как показано ниже - в конце из-за некоторых ошибок аспекта форматирования, которые я только что получил здесь в блоке текста.См. Конец поста.

  6. Аналогично, df.write или df.read также нельзя использовать ниже.Что вы можете сделать, так это написать отдельные операторы execute / mutate, скажем, ORACLE, mySQL.

Надеюсь, это поможет.

rdd.foreachPartition(iter => {
       while(iter.hasNext) {
         val item = iter.next()
         // do something
         spark.sql("INSERT INTO tableX VALUES(2,7, 'CORN', 100, item)")
         // do some other stuff
  })

или

RDD.foreachPartition (records => {       
  val JDBCDriver = "com.mysql.jdbc.Driver" ...
  ...
  connectionProperties.put("user", s"${jdbcUsername}")
  connectionProperties.put("password", s"${jdbcPassword}")
 val connection = DriverManager.getConnection(ConnectionURL, jdbcUsername, jdbcPassword)
  ...
  val mutateStatement = connection.createStatement()
  val queryStatement = connection.createStatement()
  ...
      records.foreach (record => { 
              val val1 = record._1
              val val2 = record._2
              ...
              mutateStatement.execute (s"insert into sample (k,v) values(${val1}, ${nIterVal})")      
            })
  }            
)   
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...