Замена символа в строковых столбцах фрейма данных в Scala - PullRequest
0 голосов
/ 21 ноября 2018

У меня есть задача удалить все разделители строк (\ n) из всех строковых столбцов в таблице.Число столбцов таблицы неизвестно, код должен обрабатывать любую таблицу.

Я написал код, который будет проходить через все столбцы в цикле, извлекать тип данных столбца и заменять разделитель строк:

  //let's assume we already have a dataframe 'df' that can contain any table
  df.cache()
  val dfTypes = df.dtypes
  for ( i <- 0 to (dfTypes.length - 1)) {
    var tupCol = dfTypes(i)
    if (tupCol._2 == "StringType" )
      df.unpersist()
      df = df.withColumn(tupCol._1, regexp_replace(col(tupCol._1), "\n", " "))
      df.cache()
  }
  df.unpersist()

Сам код работает нормально, но когда я запускаю этот код для ~ 50 таблиц параллельно, я постоянно получаю следующую ошибку для одной случайной таблицы:

18/11/20 04:31:41 WARN TaskSetManager: Lost task 9.0 in stage 6.0 (TID 29, ip-10-114-4-145.us-west-2.compute.internal, executor 1): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:260)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.close(UnsafeRowSerializer.scala:96)
at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:173)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Я могу запустить меньше или больше, чем50 рабочих мест, но единственное (случайное) продолжает терпеть неудачу.

Задания выполняются в кластере EMR со следующей конфигурацией:

Главный узел: r4.2xlarge x 1

Основные узлы: m5.2xlarge x 3

Узлы задач: m5.2xlarge x (автоматическое масштабирование от 1 до 10)

Я думаю, что мой код потребляет много памяти и дискового пространства, потому что он создает новые циклы данных в цикле.Но я не вижу другого решения для обработки таблицы, не зная количество строковых столбцов.Мне нужно предложение о том, как оптимизировать код.Благодарю.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...