Spark SQL Набор данных [Row] собирает данные для броска драйвера java .io.EOFException: Преждевременный EOF: префикс длины недоступен - PullRequest
0 голосов
/ 14 апреля 2020

при использовании Spark для чтения набора данных с помощью следующего кода:

val df: Dataset[Row] = spark.read.format("csv).schema(schema).load("hdfs://master:9000/mydata")

Затем я хочу собрать данные для драйвера:

val rows_array: Array[Row] = df.collect()

Произошла ошибка:

java.io.EOFException: Premature EOF: no length prefix available
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:244)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStream$ResponseProcessor.run(DFSOutputStream.java:733)

Причиной этой ошибки является слишком много элементов данных, потому что когда я использую

val rows_array: Array[Row] = df.take(10,000,000)

, он может успешно работать. Но когда я использую

val rows_array: Array[Row] = df.take(100,000,000)

, снова появляется ошибка (Успешно запустится на следующий день, но все еще возникают ошибки при получении всех данных)

Среда:
Spark 2.4.3
Было oop 2.7.7
Всего строк данных около 800 000 000, 12 ГБ и памяти достаточно.

------------------ --------------------------------------- строка редактирования --------- -----------------------------------------------

Сегодня я снова запустил его, используя приведенный ниже код:

val fields = Array.range(0, 2).map(i => StructField(s"col$i", IntegerType))
val schema: StructType = new StructType(fields)
val spark: SparkSession = SparkSession.builder.appName("test").getOrCreate
val df: Dataset[Row] = spark.read.format("csv).schema(schema).load("hdfs://master:9000/mydata")

df.cache()
df.count()

val df_rows: Array[Row] = df.collect()
print("df_rows[0] + df_rows(0))
print("df_rows size:" + df_rows.length)

Я подаю заявку:

{SPARK_HOME}/bin/spark-submit --master spark://master:7077 \
    --conf spark.executor.cores=35 \
    --total-executor-cores 105 \
    --executor-memory 145g \
    --driver-memory 200g \
    --conf "spark.executor.extraJavaOptions=-Xms145g" \
    --conf "spark.driver.extraJavaOptions=-Xms200g"

Размер файла CSV, который я прочитал, составляет около 12 ГБ, каждый строка в файле csv содержит два целых числа, а память, занятая функцией df.cache (), составляет 5,5 ГБ.

Среда:
Spark 2.4.3
Имеет oop 2.7.7
Всего строк данных около 800 000 000, 12 ГБ
В кластере три машины, и все они рабочие, память узла, отправляющего задание, составляет 370 ГБ (узел драйвера)

Я наблюдал за сетью искры и все задачи были успешными завершено (время выполнения около 60 с), но через десять минут в оболочке будет сгенерирована ошибка (Внимание: процесс не завершился, и файл в HDFS исправен):

WARN hdfs.dfsClient: DFSOutputStream ResponseProcessor exception for block xxxxx
java.io.EOFException: Premature EOF: no length prefix available
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:244)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStream$ResponseProcessor.run(DFSOutputStream.java:733)
WARN hdfs.DFSClient: Error Recovery for block xxxxx in pipeline DatanodeInfoWithStorage[ipxxx,DISK],DatanodeInfoWithStorage[ipxxx,DISK],DatanodeInfoWithStorage[ipxxx,DISK]
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
    at org.apache.hadoop.net.SocketIOWithTImeout.doIO(SocketIOWithTimeout.java:142)
    at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
    at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
    at java.io.BUfferedOutputStream.flushBuffer(BufferOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputSream.java:140)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStream.run(DFSOutputStream.java:508)

И следующие две строки кода не выполнялись, потому что я не вижу вывод:

print("df_rows[0] + df_rows(0))
print("df_rows size:" + df_rows.length)

Кроме того, я также контролировал использование памяти машины, на которой находится и обнаружен драйвер что объем используемой памяти увеличивается (после 30 минут работы процесс отправки [драйвер] занимает около 120 ГБ)

2020.04.16:

Другая информация:

Преобразованный в rdd может успешно работать:

val rows_array: Array[Row] = df.rdd.collect()

Но в этом случае потребление памяти очень дорого (выделенная мною память почти израсходована).

Кто-нибудь знает причину? Другой вопрос: почему используется такое большое количество памяти?

...