Как разделить значения двух столбцов с другим именем в sqlcontext? - PullRequest
0 голосов
/ 11 июня 2019

У меня есть файл с названием tagupdate (UserId, MovieId, Tag), а также таблица состоит из (MovieId, Tag, вхождения, количества) изменить имя MovieId, тег, вхождение в качестве eachTagCount, считать как totalcount. Я хочу разделить значение eachTagCount / totalCount с новым именем как tagScore. Как написать этот запрос? val tagScore = sqlContext.sql («ВЫБЕРИТЕ MovieId, Tag, (eachTagCount / totalCount) КАК tagScore ОТ окончательного результата»)

val finalresult = sqlContext.sql ("ВЫБЕРИТЕ MovieId, Tag, вхождение AS eachTagCount, подсчет AS totalCount ОТ результата ORDER BY MovieId")

finalresult.rdd
  .map(_.toSeq.map(_+"").reduce(_+","+_))
  .saveAsTextFile("/usr/local/spark/dataset/algorithm3/output5")
  case class fine(  MovieId:Int,Tag:String,occurrence:Int,count:Int)
  val Data5 = sc.textFile("file:///usr/local/spark/dataset/algorithm3/output5").map(_.split(",")).map(p => fine(p(0).trim.toInt,p(1),p(2).trim.toInt,p(3).trim.toInt)).toDF()
Data5.registerTempTable("finalresult")


  val tagScore = sqlContext.sql("SELECT MovieId,Tag,( eachTagCount / totalCount) AS tagScore FROM finalresult")

   tagScore.rdd
  .map(_.toSeq.map(_+"").reduce(_+","+_))
  .saveAsTextFile("/usr/local/spark/dataset/algorithm3/output6")
def main(args: Array[String]) {
     val ratings= sc.textFile("file:///usr/local/spark/dataset/tagupdate") 
   .map(line => line.split(";"))
     .map(userRecord => (userRecord(0),
     userRecord(1), userRecord(2),userRecord(3)))
     val ratingsRDD = sc.textFile("file:///usr/local/spark/dataset/tagupdate")

}
}

Исключение

Caused by: org.apache.spark.sql.AnalysisException: Cannot resolve column name " eachTagCount";
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at scala.Option.getOrElse(Option.scala:121)

1 Ответ

2 голосов
/ 14 июня 2019

если вы посмотрите на эту часть вашего кода

case class fine(  MovieId:Int,Tag:String,occurrence:Int,count:Int)
val Data5 = sc.textFile("file:///usr/local/spark/dataset/algorithm3  /output5").map(_.split(",")).map(p => fine(p(0).trim.toInt,p(1),p(2).trim.toInt,p(3).trim.toInt)).toDF()
Data5.registerTempTable("finalresult")
val tagScore = sqlContext.sql("SELECT MovieId,Tag,( eachTagCount / totalCount) AS tagScore FROM finalresult")

выше, Data5 преобразуется в класс case .. если вы выполните printchema для data5, у вас будет что-то вроде этого

|-- MovieId
|-- Tag
|-- occurrence
|-- count

когда вы регистрируете эту таблицу как временную и запускаете select, очевидно, он не найдет столбцы eachTagCount и totalCount.растание = eachTagCount и count = totalCount

, затем просто измените ваш оператор выбора ниже

 val tagScore = sqlContext.sql("SELECT MovieId,Tag,(occurrence/count) AS tagScore FROM finalresult")

Надеюсь, что это решит вашу проблему, и это очень неэффективный способ написания вашего кода.Если вы изучаете, то это нормально .. imho Подсказка: вы можете просто читать файлы в формате csv, используя spark.read.csv.это исключит процесс rdd.map, так как для вас пишет, что вы также можете использовать df.write.csv (путь).сделает ваш код менее запутанным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...