Итерация по Spark DataFrame занимает большое время и завершается ошибкой OutOfMemoryError: превышен предел издержек GC - PullRequest
0 голосов
/ 18 мая 2018

Я обрабатываю 5M записей, представленных в XML.Я загружаю их в Spark Dataframe, а затем пытаюсь загрузить их в HBase, используя метод foref для dataframe.Я получаю из-за ошибки памяти после нескольких времени обработки вокруг самого foreach или чрезвычайно медленной загрузки.Кто-нибудь может предложить какое-либо решение или лучший подход?

Код :

val xmlSchemaXML = StructType(Array(
    StructField("A", StringType, nullable = true),
    StructField("B", StringType, nullable = true),
    StructField("C", StringType, nullable = true),
    StructField("D", StringType, nullable = true))
  )

  //Get File In DataFrame
  var dfXML = sqlContext.read.format("com.databricks.spark.xml")
    .option("rootTag", "ABC")
    .option("rowTag", "AB")
    .schema(xmlSchemaXML)
    .load("file:///home/xyz.xml")
    .withColumn("as_of_date", current_date())
    .withColumn("last_updated_date", current_timestamp())


  //Create HBase Configuration
  val hBaseConf = HBaseConfiguration.create()

  //Set HBase Configurations
  hBaseConf.set("hadoop.security.authentication", "kerberos")
  hBaseConf.set("hbase.zookeeper.quorum", cluster)
  hBaseConf.set("hbase.zookeeper.property.client.port", "2181")

  //Login Using KeyTab
  UserGroupInformation.setConfiguration(hBaseConf)
  UserGroupInformation.loginUserFromKeytab("user", "file:///tmp/keytab.keytab")

  println("Creating Connection With HBase...")


  val hBaseAdmin = new HBaseAdmin(hBaseConf)

  /** *************Check if Table Already Exists or Create One ***************/

  if (!hBaseAdmin.isTableAvailable("ns:table_name")) {
    println("ns:table_name does not exist...")
    val tableDescriptor = new HTableDescriptor(TableD.valueOf("ns:table_name"))
    val columnDescriptor = new HColumnDescriptor(Bytes.toBytes("cf"))
    columnDescriptor.setVersions(1, 15)

    try {
      tableDescriptor.addFamily(columnDescriptor)
      hBaseAdmin.createTable(tableDescriptor)
      println("ns:table_name created...")
    }
    catch {
      case _: Throwable => println("table creation failed...")
    }
  }


  /** **************GET RECORD COUNT FROM ns:table_name ****************/
  var rowKeyCount: Long = 0
  try {
    hBaseConf.set(TableInputFormat.INPUT_TABLE, "ns:table_name")
    val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found in ns:table_name: " + hBaseRDD.count())
    rowKeyCount = hBaseRDD.count()
    println("====================================================================================")
  }
  catch {
    case _: Throwable => println("table reading failed...")
    case npe: NullPointerException =>
      println("Result NullPointerException: Table does not exist")
      rowKeyCount = 0
  }

  println("----------------START READING DATA FROM DATAFRAME AND LOAD TO HBASE----------------")

  //Create HTable for ns:table_name
  val hTable = new HTable(hBaseConf, "ns:table_name")

  println("Total Rows in File: " + dfXML.count())


  var A = ""
  var B = ""
  var C = ""
  var D = ""
  var as_of_date = ""
  var last_updated_date = ""

  dfXML.limit(100000).collect().foreach(f = elem => {

    //println(elem)
    rowKeyCount = rowKeyCount + 1

    //println("0")
     if (elem.getString(0) == null)
       A = ""
     else
    A = elem.getString(0)

    //println("1")
    if (elem.getString(1) == null)
      B = ""
    else
    B = elem.getString(1)


    //println("2")
    if (elem.getString(2) == null)
      C = ""
    else
    C = elem.getString(2)

    //println("3")
    if (elem.getString(3) == null)
      D = ""
    else
    D = elem.getString(3)

    //println("4")        
    as_of_date = elem.getDate(4).toString

    //println("5")        
    last_updated_date = elem.getTimestamp(5).toString


    var put = new Put(rowKeyCount.toString.getBytes()); //Store RowKey

    put.addColumn("cf".getBytes(), "A".getBytes(), A.getBytes())
    put.addColumn("cf".getBytes(), "B".getBytes(), B.getBytes())
    put.addColumn("cf".getBytes(), "C".getBytes(), C.getBytes())
    put.addColumn("cf".getBytes(), "D".getBytes(), D.getBytes())
    put.addColumn("cf".getBytes(), "as_of_date".getBytes(), as_of_date.getBytes())
    put.addColumn("cf".getBytes(), "last_updated_date".getBytes(), last_updated_date.getBytes())

    //Commit to HBaseDB        
    hTable.put(put);
    //println(rowKeyCount + " : Record written to HBase...")

  })

  hTable.flushCommits();

1 Ответ

0 голосов
/ 18 мая 2018

Что вам нужно сделать, это увеличить 100 разделов по умолчанию до чего-то более чувствительного к вашей рабочей нагрузке.Пожалуйста, начните с df.repartition(1000). foreachPartition(..., а затем посмотрите, 1000 слишком много или мало.

5M записей не кажется большим количеством, скорее всего, у вас либо большие записи, либо недостаточно места в куче, выделенного для исполнителей.

...