при использовании Spark для чтения набора данных с помощью следующего кода:
val df: Dataset[Row] = spark.read.format("csv).schema(schema).load("hdfs://master:9000/mydata")
Затем я хочу собрать данные для драйвера:
val rows_array: Array[Row] = df.collect()
Произошла ошибка:
java.io.EOFException: Premature EOF: no length prefix available
at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:244)
at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStream$ResponseProcessor.run(DFSOutputStream.java:733)
Причиной этой ошибки является слишком много элементов данных, потому что когда я использую
val rows_array: Array[Row] = df.take(10,000,000)
, он может успешно работать. Но когда я использую
val rows_array: Array[Row] = df.take(100,000,000)
, снова появляется ошибка (Успешно запустится на следующий день, но все еще возникают ошибки при получении всех данных)
Среда:
Spark 2.4.3
Было oop 2.7.7
Всего строк данных около 800 000 000, 12 ГБ и памяти достаточно.
------------------ --------------------------------------- строка редактирования --------- -----------------------------------------------
Сегодня я снова запустил его, используя приведенный ниже код:
val fields = Array.range(0, 2).map(i => StructField(s"col$i", IntegerType))
val schema: StructType = new StructType(fields)
val spark: SparkSession = SparkSession.builder.appName("test").getOrCreate
val df: Dataset[Row] = spark.read.format("csv).schema(schema).load("hdfs://master:9000/mydata")
df.cache()
df.count()
val df_rows: Array[Row] = df.collect()
print("df_rows[0] + df_rows(0))
print("df_rows size:" + df_rows.length)
Я подаю заявку:
{SPARK_HOME}/bin/spark-submit --master spark://master:7077 \
--conf spark.executor.cores=35 \
--total-executor-cores 105 \
--executor-memory 145g \
--driver-memory 200g \
--conf "spark.executor.extraJavaOptions=-Xms145g" \
--conf "spark.driver.extraJavaOptions=-Xms200g"
Размер файла CSV, который я прочитал, составляет около 12 ГБ, каждый строка в файле csv содержит два целых числа, а память, занятая функцией df.cache (), составляет 5,5 ГБ.
Среда:
Spark 2.4.3
Имеет oop 2.7.7
Всего строк данных около 800 000 000, 12 ГБ
В кластере три машины, и все они рабочие, память узла, отправляющего задание, составляет 370 ГБ (узел драйвера)
Я наблюдал за сетью искры и все задачи были успешными завершено (время выполнения около 60 с), но через десять минут в оболочке будет сгенерирована ошибка (Внимание: процесс не завершился, и файл в HDFS исправен):
WARN hdfs.dfsClient: DFSOutputStream ResponseProcessor exception for block xxxxx
java.io.EOFException: Premature EOF: no length prefix available
at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:244)
at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStream$ResponseProcessor.run(DFSOutputStream.java:733)
WARN hdfs.DFSClient: Error Recovery for block xxxxx in pipeline DatanodeInfoWithStorage[ipxxx,DISK],DatanodeInfoWithStorage[ipxxx,DISK],DatanodeInfoWithStorage[ipxxx,DISK]
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
at org.apache.hadoop.net.SocketIOWithTImeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
at java.io.BUfferedOutputStream.flushBuffer(BufferOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputSream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStream.run(DFSOutputStream.java:508)
И следующие две строки кода не выполнялись, потому что я не вижу вывод:
print("df_rows[0] + df_rows(0))
print("df_rows size:" + df_rows.length)
Кроме того, я также контролировал использование памяти машины, на которой находится и обнаружен драйвер что объем используемой памяти увеличивается (после 30 минут работы процесс отправки [драйвер] занимает около 120 ГБ)
2020.04.16:
Другая информация:
Преобразованный в rdd может успешно работать:
val rows_array: Array[Row] = df.rdd.collect()
Но в этом случае потребление памяти очень дорого (выделенная мною память почти израсходована).
Кто-нибудь знает причину? Другой вопрос: почему используется такое большое количество памяти?