Остановка работы Spark при добавлении новых столбцов в DataFrame - PullRequest
0 голосов
/ 26 апреля 2019

Я читаю файлы из S3 в DataFrame, ограничивая записи до 100. Затем я добавляю около 10 столбцов в этот DataFrame.

Я вижу новую схему, и она показывает, что столбцыдобавлено:

Но когда я выполняю какие-либо действия с окончательным DataFrame (в данном случае existingDF), задание Spark не запускается.

В интерфейсе Spark все, что я вижу, это:

collect at <console>:52

Для чтения файлов с S3.

Само задание не завершается и продолжает работать, но в интерфейсе Spark ничего не отображается.

Iпроверил логи, и он показывает, что все исполнители были удалены один за другим, а также в интерфейсе Spark он показывает, что все исполнители не работают, ожидают драйвера.

Еще одна странность в том, что в интерфейсе Spark отображаются ядра драйверовкак 0, но каждый исполнитель имеет 4 ядер.Я настроил как драйвер, так и исполнителей на наличие 4 ядер.

Я попытался использовать val вместо var и явно создал новый DataFrame для каждой операции, но это дает тот же результат.

Я также пытался сохранить DataFrame на DISK после чтения из S3, что также не помогает.

Вот основной код:

import org.apache.spark.sql.{DataFrame, _}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ListBuffer
import scala.util.Try

def columnExists(df: DataFrame, path: String): Boolean = Try(df(path)).isSuccess

var existingDF = sqlContext.read.format("parquet").option("basePath", s"$basePath").schema(
      patchedSchema).load(filePaths: _*).limit(100)

if (!columnExists(existingDF, "column_one.abc")) {
      existingDF = existingDF.withColumn(
        "column_one",
        struct(
          $"column_one.*",
          struct(
            lit(null).cast("long").as("first_val"),
            lit(null).cast("string").as("second_val")
          ).as("abc")
        )
      )

      println("Column added")
}

if (!columnExists(existingDF, "column_one.def")) {
      existingDF = existingDF.withColumn(
        "column_one",
        struct(
          $"column_one.*",
          struct(
            lit(null).cast("long").as("first_val"),
            lit(null).cast("string").as("second_val")
          ).as("def")
        )
      )

      println("Column added")
}

if (!columnExists(existingDF, "column_one.ghi")) {
      existingDF = existingDF.withColumn(
        "column_one",
        struct(
          $"column_one.*",
          struct(
            lit(null).cast("long").as("first_val"),
            lit(null).cast("string").as("second_val")
          ).as("ghi")
        )
      )

      println("Column added")
}

Даже если запустить,

existingDF.explain

это не работает.

Все работает, если я просто добавляю 1 или 2 столбца, но при добавлении 3-го столбца проблема начинается.

Я пробовал на Zeppelin,он показывает, что пункт запущен, но он никогда не завершается, и в интерфейсе Spark ничего не отображается.

Я также попытался запустить его как шаг с консоли EMR, но не повезло.

Вот кластери конфигурация Spark:

Master node (1) - 4 vCore, 8 GiB memory, EBS only storage. 32 GiB

Core nodes (5) - 16 vCore, 32 GiB memory, EBS only storage. 64 GiB


spark.submit.deployMode=cluster 
spark.executor.memory=20g
--num-executors=4 
--driver-memory=20g
spark.driver.cores=4
spark.executor.cores=4 

Я предполагаю, что это не может быть проблемой с памятью, потому что у меня только 100 записей и более чем достаточно памяти для размещения этих записей.

Дайте мне знать, что янужно добавить больше деталейИлс, о чем угодно.

...