Я читаю файлы из S3 в DataFrame, ограничивая записи до 100. Затем я добавляю около 10 столбцов в этот DataFrame.
Я вижу новую схему, и она показывает, что столбцыдобавлено:
Но когда я выполняю какие-либо действия с окончательным DataFrame (в данном случае existingDF
), задание Spark не запускается.
В интерфейсе Spark все, что я вижу, это:
collect at <console>:52
Для чтения файлов с S3.
Само задание не завершается и продолжает работать, но в интерфейсе Spark ничего не отображается.
Iпроверил логи, и он показывает, что все исполнители были удалены один за другим, а также в интерфейсе Spark он показывает, что все исполнители не работают, ожидают драйвера.
Еще одна странность в том, что в интерфейсе Spark отображаются ядра драйверовкак 0
, но каждый исполнитель имеет 4
ядер.Я настроил как драйвер, так и исполнителей на наличие 4
ядер.
Я попытался использовать val
вместо var
и явно создал новый DataFrame для каждой операции, но это дает тот же результат.
Я также пытался сохранить DataFrame на DISK после чтения из S3, что также не помогает.
Вот основной код:
import org.apache.spark.sql.{DataFrame, _}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ListBuffer
import scala.util.Try
def columnExists(df: DataFrame, path: String): Boolean = Try(df(path)).isSuccess
var existingDF = sqlContext.read.format("parquet").option("basePath", s"$basePath").schema(
patchedSchema).load(filePaths: _*).limit(100)
if (!columnExists(existingDF, "column_one.abc")) {
existingDF = existingDF.withColumn(
"column_one",
struct(
$"column_one.*",
struct(
lit(null).cast("long").as("first_val"),
lit(null).cast("string").as("second_val")
).as("abc")
)
)
println("Column added")
}
if (!columnExists(existingDF, "column_one.def")) {
existingDF = existingDF.withColumn(
"column_one",
struct(
$"column_one.*",
struct(
lit(null).cast("long").as("first_val"),
lit(null).cast("string").as("second_val")
).as("def")
)
)
println("Column added")
}
if (!columnExists(existingDF, "column_one.ghi")) {
existingDF = existingDF.withColumn(
"column_one",
struct(
$"column_one.*",
struct(
lit(null).cast("long").as("first_val"),
lit(null).cast("string").as("second_val")
).as("ghi")
)
)
println("Column added")
}
Даже если запустить,
existingDF.explain
это не работает.
Все работает, если я просто добавляю 1 или 2 столбца, но при добавлении 3-го столбца проблема начинается.
Я пробовал на Zeppelin,он показывает, что пункт запущен, но он никогда не завершается, и в интерфейсе Spark ничего не отображается.
Я также попытался запустить его как шаг с консоли EMR, но не повезло.
Вот кластери конфигурация Spark:
Master node (1) - 4 vCore, 8 GiB memory, EBS only storage. 32 GiB
Core nodes (5) - 16 vCore, 32 GiB memory, EBS only storage. 64 GiB
spark.submit.deployMode=cluster
spark.executor.memory=20g
--num-executors=4
--driver-memory=20g
spark.driver.cores=4
spark.executor.cores=4
Я предполагаю, что это не может быть проблемой с памятью, потому что у меня только 100 записей и более чем достаточно памяти для размещения этих записей.
Дайте мне знать, что янужно добавить больше деталейИлс, о чем угодно.