Невозможно создать RDD из данных в HDFS - PullRequest
0 голосов
/ 11 февраля 2020

Я пытаюсь создать СДР с использованием кода, но не могу этого сделать. Есть ли решение этой проблемы. Я попытался запустить его с localhost: сведения о порте. Я также попытался запустить его по всему пути HDFS: /user/training/intel/NYSE.csv. Любой путь, который я использую, ищется только в локальном каталоге, но не в hdfs. Спасибо

scala> val myrdd = sc.textFile("/training/intel/NYSE.csv")
myrdd: org.apache.spark.rdd.RDD[String] = /training/intel/NYSE.csv MapPartitionsRDD[5] at textFile at <console>:24

scala> myrdd.collect
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/training/intel/NYSE.csv
  at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
  at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
  ... 48 elided

Я также попробовал следующее:

scala> val myrdd = sc.textFile("hdfs://localhost:8020/training/intel/NYSE.csv")
myrdd: org.apache.spark.rdd.RDD[String] = hdfs://localhost:8020/training/intel/NYSE.csv MapPartitionsRDD[7] at textFile at <console>:24

scala> myrdd.collect
java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).; Host Details : local host is: "hadoop/127.0.0.1"; destination host is: "localhost":8020;
  at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
  at org.apache.hadoop.ipc.Client.call(Client.java:1479)
  at org.apache.hadoop.ipc.Client.call(Client.java:1412)
  at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
  at com.sun.proxy.$Proxy24.getFileInfo(Unknown Source)
  at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
  at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  at com.sun.proxy.$Proxy25.getFileInfo(Unknown Source)
  at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
  at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
  at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
  at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
  at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1674)
  at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
  at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
  ... 48 elided
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
  at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89)
  at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108)
  at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.<init>(RpcHeaderProtos.java:2201)
  at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.<init>(RpcHeaderProtos.java:2165)
  at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto$1.parsePartialFrom(RpcHeaderProtos.java:2295)
  at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto$1.parsePartialFrom(RpcHeaderProtos.java:2290)
  at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:200)
  at com.google.protobuf.AbstractParser.parsePartialDelimitedFrom(AbstractParser.java:241)
  at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:253)
  at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:259)
  at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:49)
  at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcHeaderProtos.java:3167)
  at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1086)
  at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)

Нет независимо от того, как я запускаю его, я получаю, что путь не существует. HDFS directory with File(NYSE.csv)

Ответы [ 3 ]

1 голос
/ 13 февраля 2020

Файлы находятся в HDFS.

Spark настроен для чтения из локальной файловой системы file:/

Вам необходимо отредактировать файл core-site. xml файл в Spark установочный каталог, чтобы убедиться, что fs.defaultFS правильно настроен для использования вашего Had oop Namenode

InvalidProtocolBufferException: сообщение протокола содержало недопустимый тег (ноль) .; Сведения о хосте: локальный хост: "hadoop / 127.0.0.1"; хост назначения: «localhost»: 8020;

Это может означать, что ваш клиент Spark HDFS не совместим с установленными серверными API Had oop или вы подключаетесь к неправильному порту

И, кроме того, файл не находится на hdfs:///training/...

Кроме того, HDFS не требуется для изучения Spark, поэтому, возможно, сначала попробуйте поиграть с командами hadoop fs или переместить файлы на локальный компьютер. система, в зависимости от ваших целей

0 голосов
/ 11 февраля 2020
 "/training/intel/NYSE.csv"

означает «начать поиск из каталога верхнего уровня». Либо измените его на "/user/training/intel/NYSE.csv" или просто "training/intel/NYSE.csv" (без начального /) для ссылки на файл относительно вашего текущего каталога.

0 голосов
/ 11 февраля 2020

Это происходит из-за внутреннего сопоставления между каталогами. Сначала go в каталог, где хранится ваш файл (NYSE.csv). Запустите команду:

df -k

Вы получите фактическую точку монтирования каталога. Например: /xyz

Теперь попробуйте найти файл (NYSE.csv) в этой точке монтирования. Например: /xyz/training/intel/NYSE.csv и используйте этот путь в своем коде.

val myrdd = sc.textfile("/xyz/training/intel/NYSE.csv");
...