Я построил кластер из 3 узлов на Hadoop3 и хочу сравнить производительность политик кодирования erasure с репликацией по умолчанию.
Я скопировал файл .csv в две папки внутри моих hdf-файлов, одна с помощьюполитика EC XOR-2-1-1024k и другая, использующая репликацию по умолчанию. Чтение репутации по умолчаниюфайл с помощью spark работает гладко, а когда я пытаюсь сделать то же самое для папки EC, выводится, что он не может найти блоки.
java.io.IOException: Got error, status message opReadBlock BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 received exception java.io.IOException: Offset 0 and length 268435456 don't match block BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 ( blockLen 134217728 ), for OP_READ_BLOCK, self=/192.168.56.102:53620, remote=/192.168.56.102:9866, for file /ec/data1.csv, for pool BP-1584172113-192.168.56.102-1570543698813 block -9223372036854775792_1001
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-10-10 17:06:29,147 WARN hdfs.DFSClient: Failed to connect to /192.168.56.102:9866 for block, add to deadNodes and continue. java.io.IOException: Got error, status message opReadBlock BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 received exception java.io.IOException: Offset 0 and length 268435456 don't match block BP-1584172113-192.168.56.102-1570543698813:blk_-9223372036854775792_1001 ( blockLen 134217728 ), for OP_READ_BLOCK, self=/192.168.56.102:53620, remote=/192.168.56.102:9866, for file /ec/data1.csv, for pool BP-1584172113-192.168.56.102-1570543698813 block -9223372036854775792_1001
fsck и dfsadmin не показывают ошибок, и я вижу блокив веб-интерфейсе и в моих папках.
Что я использую для чтения файлов:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
df = spark.read.option("header", "false")\
.option("delimiter", ",")\
.option("inferSchema", "true")\
.csv("/rep/data1.csv") ## also tried .text("/rep/data1.csv")
df.show(2, truncate=False)