невозможно создать фрейм данных из файла последовательности в Spark, созданного Sqoop - PullRequest
0 голосов
/ 10 ноября 2018

Я хочу прочитать orders данные и создать из них СДР, который сохраняется как файл sequence в hadoop fs в cloudera vm. Ниже приведены мои шаги:

1) Импорт данных заказов в виде файла последовательности:

sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba --password cloudera  --table orders -m 1 --target-dir /ordersDataSet --as-sequencefile   

2) Чтение файла в spark scala:

Spark 1.6

val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec => rec.toString())  

3) Когда я пытаюсь прочитать данные из вышеупомянутого СДР, выдается следующее сообщение об ошибке:

Caused by: java.io.IOException: WritableName can't load class: orders
    at org.apache.hadoop.io.WritableName.getClass(WritableName.java:77)
    at org.apache.hadoop.io.SequenceFile$Reader.getValueClass(SequenceFile.java:2108)
    ... 17 more
Caused by: java.lang.ClassNotFoundException: Class orders not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2185)
    at org.apache.hadoop.io.WritableName.getClass(WritableName.java:75)
    ... 18 more

Я не знаю, почему он говорит, что не может найти заказы. Куда я иду не так?

Я также сослался на коды из этих двух ссылок, но не повезло:
1) См. Последовательность часть
2) См. Шаг №. 8

Ответы [ 2 ]

0 голосов
/ 10 ноября 2018

Я разобрался с решением своей проблемы. Ну, я собираюсь написать длинное решение, но я надеюсь, что оно будет иметь некоторый смысл.

1) Когда я пытался прочитать данные, которые были импортированы в HDFS с использованием SQOOP, выдает ошибку по следующим причинам:

A) Файл последовательности - все о key-value pair. Поэтому, когда я импортирую его с помощью sqoop, импортируемые данные находятся не в паре ключ-значение, поэтому при чтении выдает ошибку.
Б) Если вы попытаетесь прочитать few characters, из которого вы сможете определить two classes, необходимый для передачи в качестве ввода при чтении файла последовательности, вы получите данные, как показано ниже:

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/pa* | head -c 300
SEQ!org.apache.hadoop.io.LongWritableorders�;�M��c�K�����@���-OCLOSED@���PENDING_PAYMENT@���/COMPLETE@���"{CLOSED@���cat: Unable to write to output stream.  

Выше вы можете видеть только one class, т.е. org.apache.hadoop.io.LongWritable, и когда я пропускаю это во время чтения данных последовательности, он выдает ошибку, которая упоминается в посте.

val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.LongWritable]).map(rec => rec.toString())  

Я не думаю, что точка B является основной причиной этой ошибки, но я очень уверен, что точка A является реальной причиной этой ошибки.

2) Ниже описано, как я решил свою проблему.

Я импортировал данные в виде файла avro data в другом месте назначения, используя SQOOP. Затем я создал датафрейм из avro, используя следующие способы:

scala> import com.databricks.spark.avro._;
scala> val avroData=sqlContext.read.avro("path")  

Теперь я создал key-value pair и сохранил его как sequence file

avroData.map(p=>(p(0).toString,(p(0)+"\t"+p(1)+"\t"+p(2)+"\t"+p(3)))).saveAsSequenceFile("/user/cloudera/problem5/sequence")  

Теперь, когда я пытаюсь прочитать few символов вышеописанного файла, это дает мне two classes, что мне нужно при чтении файла, как показано ниже:

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/part-00000 | head -c 300
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text^#%���8P���11  1374735600000   11599   CLOSED&2#2  1374735600000   256 PENDING_PAYMENT!33  1374735600000   12111   COMPLETE44  1374735600000   8827    CLOSED!55   1374735600000   11318   COMPLETE 66 1374cat: Unable to write to output stream.  

scala> val sequenceData=sc.sequenceFile("/user/cloudera/problem5/sequence",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec=>rec.toString)
sequenceData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at map at <console>:30

Теперь, когда я пытаюсь распечатать данные, отображаются данные, как показано ниже:

scala> sequenceData.take(4).foreach(println)
(1,1    1374735600000   11599   CLOSED)
(2,2    1374735600000   256 PENDING_PAYMENT)
(3,3    1374735600000   12111   COMPLETE)
(4,4    1374735600000   8827    CLOSED)

И последнее, но не менее важное. Спасибо всем за ваши ценные усилия. Ура !!

0 голосов
/ 10 ноября 2018

sqoop имеет мало общего с этим, вот пример более реалистичного сценария, в котором saveAsSequenceFile всегда принимает k, v пар - это может помочь вам:

import org.apache.hadoop.io._

val RDD = sc.parallelize( List( (1, List("A", "B")) , (2, List("B", "C")) , (3, List("C", "D", "E")) ) )
val RDD2 = RDD.map(x => (x._1, x._2.mkString("/")))
RDD2.saveAsSequenceFile("/rushhour/seq-directory/2")

val sequence_data = sc.sequenceFile("/rushhour/seq-directory/*", classOf[IntWritable], classOf[Text])
                  .map{case (x, y) => (x.get(), y.toString().split("/")(0), y.toString().split("/")(1))}

sequence_data.collect

возвращается:

res20: Array[(Int, String, String)] = Array((1,A,B), (2,B,C), (3,C,D), (1,A,B), (2,B,C), (3,C,D))

Я не уверен, хотите ли вы RDD или DF, но конвертация RDD в DF, конечно, тривиальна.

...