Spark не может прочитать csv-файлы Erasure в Hadoop 3 - PullRequest
0 голосов
/ 10 октября 2019

Я построил кластер из 3 узлов на Hadoop3 и хочу сравнить производительность политик кодирования erasure с репликацией по умолчанию.

Я скопировал файл .csv в две папки внутри моих hdf-файлов, одна с помощьюполитика EC XOR-2-1-1024k и другая, использующая репликацию по умолчанию. Чтение репутации по умолчаниюфайл с помощью spark работает гладко, а когда я пытаюсь сделать то же самое для папки EC, выводится, что он не может найти блоки.

java.io.IOException: Got error, status message opReadBlock BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 received exception java.io.IOException:  Offset 0 and length 268435456 don't match block BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 ( blockLen 134217728 ), for OP_READ_BLOCK, self=/192.168.56.102:53620, remote=/192.168.56.102:9866, for file /ec/data1.csv, for pool BP-1584172113-192.168.56.102-1570543698813 block -9223372036854775792_1001
    at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
    at org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
    at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
    at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
    at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
    at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
    at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2019-10-10 17:06:29,147 WARN hdfs.DFSClient: Failed to connect to /192.168.56.102:9866 for block, add to deadNodes and continue. java.io.IOException: Got error, status message opReadBlock BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 received exception java.io.IOException:  Offset 0 and length 268435456 don't match block BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 ( blockLen 134217728 ), for OP_READ_BLOCK, self=/192.168.56.102:53620, remote=/192.168.56.102:9866, for file /ec/data1.csv, for pool BP-1584172113-192.168.56.102-1570543698813 block -9223372036854775792_1001

fsck и dfsadmin не показывают ошибок, и я вижу блокив веб-интерфейсе и в моих папках.

Что я использую для чтения файлов:

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Word Count")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()


df = spark.read.option("header", "false")\
    .option("delimiter", ",")\
    .option("inferSchema", "true")\
    .csv("/rep/data1.csv")            ## also tried .text("/rep/data1.csv")

df.show(2, truncate=False)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...