Оптимизация Spark Scala Job - много задач, вложенное отображение занимает несколько часов, XML Parse - PullRequest
0 голосов
/ 01 марта 2020

Я выполняю задание spark, выполняемое в оболочке spark, которая выполняется в течение более 80 часов, и должен быть какой-то способ ее распространения. Вот конфигурации, которые я отправил при запуске задания, и код, который выполняется.

spark-shell --master \
yarn \
--num-executors 100 \
--name cde_test \
--executor-cores 4 \
--executor-memory 5g \
--driver-cores 2 \
--driver-memory 3g \
--jars ./spark_jars/spark-xml_2.11-0.8.0.jar \
--verbose

Вот пи c информации об исполнителях в инструменте пользовательского интерфейса менеджера ресурсов: spark_ui_executor_screenshot

Я хочу проанализировать XML файлы с помощью spark - xml и извлечь определенные поля и сохранить в CSV. Я думал, что увеличение числа исполнителей ускорит работу, поскольку это небольшие и быстрые задачи с низким объемом памяти, но я не уверен, правильно ли я сделал или способ написания кода помешает параллельному выполнению. Код ниже, и любая помощь приветствуется.

import org.apache.hadoop.fs._
import collection.mutable._
import spark.implicits._
import java.io.File
import java.util.regex.Pattern
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.util.control.Exception._
import org.apache.commons.io.FilenameUtils  
import org.apache.commons.lang.StringEscapeUtils
import org.apache.hadoop.conf.Configuration

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

object HdfsUtils {
  def pathExists(path: String, sc: SparkContext): Boolean = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    fs.exists(new Path(path))
  }

  def getFullPath(path:String, sc: SparkContext): String = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    fs.getFileStatus(new Path(path)).getPath().toString
  }

  def getAllFiles(path:String, sc: SparkContext): Seq[String] = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    val files = fs.listStatus(new Path(path))
    files.map(_.getPath().toString)
  }
}

//Four different mapping functions
val path_list = Seq("path_1_for_first_directory",
"path_2_for_second_directory")

path_list.foreach ( path => {
val hdfs_directory = HdfsUtils.getAllFiles(path, sc)

hdfs_directory.foreach( intermediate_folder => {
val intermediate_folders = HdfsUtils.getAllFiles(intermediate_folder, sc)

intermediate_folders.foreach( final_folder => {
val hdfs_files = HdfsUtils.getAllFiles(final_folder, sc)

hdfs_files.foreach( xml_file => {

val date = raw"(\d{4})-(\d{2})-(\d{2})".r
val directory_date = date.findFirstIn(xml_file).
getOrElse(xml_file)

//Ignore meta files
if (xml_file.contains("META") || xml_file.contains("meta")){


} else if (xml_file.contains(".xml") || xml_file.contains(".XML")){


try{

val xml_df = spark.
read.
format("xml").
option("rowTag","root").
option("treatEmptyValuesAsNulls","true").
option("nullValue", null).
option("emptyValue", null).
load(xml_file)

val info_df = xml_df.
select(
  substring($"column_1",0,8).alias("date"),
  substring($"column_2",9,20).alias("time"),
  $"column_3".alias("first_name").cast("string"),
  $"column_4".alias("last_name").cast("string"),
  $"column_5".alias("birthday").cast("string"),
  $"column_6".alias("street").cast("string"),
  $"column_7".alias("city").cast("string"),
  $"column_8".alias("state").cast("string"),
  $"column_9".alias("zip_code").cast("string"),
  $"column_10".alias("country").cast("string")
)

val outputfile = "/path_to_output/"
var filename = s"$directory_date"
var outputFileName = outputfile + filename 


info_df.write
    .format("csv")
    .option("header", "false")
    .option("sep","|")
    .mode("append")
    .save(outputFileName)

    } 

    catch{ 
      case _: RuntimeException => {}
      case _: Exception => {}
    }
}
})
})
})
})

1 Ответ

0 голосов
/ 02 марта 2020

Вы используете foreach для Seq, который является последовательным (как намекал Христо Илиев). Если у вас много файлов, которые в основном несколько малы, то обработка по одному может быть медленной.

  • Вы можете использовать подстановочные знаки вместо перебора файлов HDFS. Вы можете прочитать несколько файлов в больший DataFrame одновременно; например, здесь мы обрабатываем целый месяц за раз:
spark.read.format("xml").load("/somepath/*/YYYY-MM-*.xml")

Обратите внимание на /*/ для обозначения «промежуточного каталога». Что может быть лучше для вас, может зависеть от того, есть ли более конкретный шаблон c для этих промежуточных каталогов, или если они также зависят от даты.

...