Соединения Spark Table - Проблема распределения ресурсов - PullRequest
1 голос
/ 13 октября 2019

Я застрял при работе со столами улья с использованием искрового кластера (пряжа на месте)У меня есть около 7 таблиц, к которым мне нужно присоединиться, а затем заменить некоторые нулевые значения и записать результат обратно в Hive final DF.

Я использую spark SQL (Scala), сначала создавая 6 различных фреймов данных. а затем соедините все кадры данных и запишите результат в таблицу кустов.

Через пять минут мой код выдает ошибку ниже, что, как я знаю, связано с неправильной настройкой распределения ресурсов.

19/10/13 06:46:53 ERROR client.TransportResponseHandler: Still have 2 requests outstanding when connection from /100.66.0.1:36467 is closed
19/10/13 06:46:53 ERROR cluster.YarnScheduler: Lost executor 401 on aaaa-bd10.pq.internal.myfove.com: Container container_e33_1570683426425_4555_01_000414 exited from explicit termination request.
19/10/13 06:47:02 ERROR cluster.YarnScheduler: Lost executor 391 on aaaa-bd10.pq.internal.myfove.com: Container marked as failed: container_e33_1570683426425_4555_01_000403 on host: aaaa-bd10.pq.internal.myfove.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal

Моя аппаратная спецификация

HostName    Memory in GB    CPU Memory for Yarn CPU For Yarn
Node 1      126             32  90               26
Node 2      126             32  90               26
Node 3      126             32  90               26
Node 4      126             32  90               26

Как правильно установить указанные ниже переменные, чтобы мой код не выдавал ошибку (контейнер помечен как сбойный - уничтожен запросом 143)?

Iпробую другую конфигурацию, но пока ничего не помогло.

val spark = (SparkSession.builder
             .appName("Final Table")
             .config("spark.driver.memory", "5g") 
             .config("spark.executor.memory", "15g") 
             .config("spark.dynamicAllocation.maxExecutors","6")
             .config("spark.executor.cores", "5")
             .enableHiveSupport()
             .getOrCreate())



DF1 = spark.sqk("Select * from table_1") //1.4 million records and 10 var 
DF2 = spark.sqk("Select * from table_2") //1.4 million records and 3000 
DF3 = spark.sqk("Select * from table_3") //1.4 million records and 300 
DF4 = spark.sqk("Select * from table_4") //1.4 million records and 600 
DF5 = spark.sqk("Select * from table_5") //1.4 million records and 150 
DF6 = spark.sqk("Select * from table_6") //1.4 million records and 2 
DF7 = spark.sqk("Select * from table_7") //1.4 million records and 12 


val joinDF1 = df1.join(df2, df1("number") === df2("number"), "left_outer").drop(df2("number")) 
val joinDF2 = joinDF1.join(df3,joinDF1("number") === df3("number"), "left_outer").drop(df3("number")) 
val joinDF3 = joinDF2.join(df4,joinDF2("number") === df4("number"), "left_outer").drop(df4("number")) 
val joinDF4 = joinDF3.join(df5,joinDF3("number") === df5("number"), "left_outer").drop(df5("number")) 
val joinDF5 = joinDF4.join(df6,joinDF4("number") === df6("number"), "left_outer").drop(df6("number")).drop("Dt") 
val joinDF6 = joinDF5.join(df7,joinDF5("number") === df7("number"), "left_outer").drop(df7("number")).drop("Dt") 
joinDF6.createOrReplaceTempView("joinDF6")

spark.sql("create table hive table as select * from joinDF6")

1 Ответ

0 голосов
/ 14 октября 2019

Пожалуйста, проверьте ваши yarn.nodemanager.log-dirs в Ambari, если вы используете Ambari. Если нет, попробуйте найти это свойство, и если оно указывает на каталог, где у вас очень мало места, измените его на другой каталог, в котором больше места.

Во время выполнения задач cotainers создают блоки, которые сохраняются в расположении yarn.nodemanger.log-dirs, если его недостаточно для хранения контейнера, контейнер начинает сбой.

...