Как эффективно преобразовать ListBuffer [ListBuffer [String]] в несколько фреймов данных и записать их с помощью Spark Scala - PullRequest
1 голос
/ 31 октября 2019

Я пытаюсь проанализировать набор XML-файлов, используя Scala и Spark. Я получаю данные для 'n' фреймов данных из файлов (т. Е. Количество фреймов данных не меняется, меняется только количество файлов)

Я анализирую набор файлов XML и сохраняю данные в ListBuffer[ListBuffer[String]]. Каждый из ListBuffer[String] содержит данные для фрейма данных. например:

ListBuffer[
    ListBuffer["1|2|3|4","5|6|7|8"],
    ListBuffer["a|b|c|d","e|f|g|h"],
    ListBuffer["q|w|e|r","w|x|y|z"]
]

Это создаст 3 кадра данных:

Dataframe1:
 col1  col2  col3  col4
   1     2     3     4
   5     6     7     8

и аналогично другим 2 кадрам данных.

Я не могу напрямую преобразовать XML в Датафрейм, так какПеред созданием dataframe необходимо выполнить много пользовательских обработок.

Я преобразую ListBuffer в Dataframe, используя следующий код:

finalListBuffer.foreach{ data =>

    columns = FunctionToReturnColumnsList()
    val schema = StructType(columns.map(field => StructField(field, StringType, true)))
    val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
    val df = sparkSess.createDataFrame(dataRDD, schema)
    ...
}

После этого шага некоторые операции выполняютсявыполняется на каждом фрейме данных (некоторые операции имеют зависимость между фреймами данных, поэтому я не могу просто обработать один фрейм данных, а затем записать), и, наконец, фреймы данных записываются с использованием следующего кода:

df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)

При выполнении этих шагов яполучаю 2 проблемы при большом размере входного файла:

1) Превышен лимит накладных расходов ГХ при создании кадра данных. (Шаг, в котором создается переменная dataRDD)

2) Удар сердцебиенияошибка тайм-аута при записи df.

Как решить эти проблемы?

Я думал об использовании ListBuffer[RDD[String]] изначально вместо ListBuffer[ListBuffer[String]]

Но может быть какмлюбой как 1 миллион файлов, и каждый файл может иметь до 10-20 записей для df. Что я делаю, так это перечисляю все файлы, обрабатываю каждый из них по одному и добавляю их результат в основной ListBuffer. Так что, если я использую RDD, мне придется использовать, union для каждого файла, и это может быть дорого. Что еще можно сделать?

1 Ответ

0 голосов
/ 31 октября 2019
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> val lbs = ListBuffer(
     |     ListBuffer("1|2|3|4","5|6|7|8"),
     |     ListBuffer("a|b|c|d","e|f|g|h"),
     |     ListBuffer("q|w|e|r","w|x|y|z")
     | )
lbs: scala.collection.mutable.ListBuffer[scala.collection.mutable.ListBuffer[String]] = ListBuffer(ListBuffer(1|2|3|4, 5|6|7|8), ListBuffer(a|b|c|d, e|f|g|h), ListBuffer(q|w|e|r, w|x|y|z))


scala> val schema = StructType(Seq(StructField("c1", StringType, true),StructField("c2", StringType, true),StructField("c3", StringType, true),StructField("c4", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(c1,StringType,true), StructField(c2,StringType,true), StructField(c3,StringType,true), StructField(c4,StringType,true))

scala> var lb_df: ListBuffer[DataFrame] = ListBuffer()
lb_df: scala.collection.mutable.ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()

scala> def createDF(lb: ListBuffer[String]) = spark.createDataFrame(spark.sparkContext.parallelize(lb.toSeq).map(_.toString.split("\\|")).map(Row(_: _*)), schema)
createDF: (lb: scala.collection.mutable.ListBuffer[String])org.apache.spark.sql.DataFrame


scala> lbs.foreach(lb => lb_df.append(createDF(lb)))

scala> lb_df.foreach(_.show())
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  6|  7|  8|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  b|  c|  d|
|  e|  f|  g|  h|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  q|  w|  e|  r|
|  w|  x|  y|  z|
+---+---+---+---+

Надеюсь, это полезно.

...