Я пытаюсь реализовать алгоритм поиска страниц в наборе данных reddit за май 2015 г., но мне не удается извлечь подредакты, на которые есть ссылки в комментариях. Столбец содержит название субредита, а другой содержит комментарий, опубликованный в этом субредите, который ссылается на другой субредит.
subreddit body
videos|"Tagged you as ""...
Quebec|Ok, c'est quoi le...
pokemon|Sorry to hear abo...
videos|Not sure what the...
ClashOfClans|Your submission, ...
realtech|Original /r/techn...
guns|Welp, those basta...
IAmA|If you are very i...
WTF|If you go on /r/w...
Fitness|Your submission h...
gifs|Hi! Take a look a...
Coachella|Yeah. If you go /...
Я сделал следующее:
val df = spark.read
.format("csv")
.option("header", "true")
.load("path\\May2015.csv")
val df1 = df.filter(df("body").contains("/r/")).select("subreddit", "body")
val lines = df1.rdd
val links = lines.map{ s =>
val x = s(1).toString.split(" ")
val b = x.filter(_.startsWith("/r/")).toList
val t = b(0)
(s(0), t)
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v =>0.25)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url =(url, rank / size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
Проблема в том, что вывод всегда:
(subreddit, CompactBuffer())
В то время как я хочу:
(subreddit, anothersubreddit)
Мне удалось решить эту проблему, но теперь я Получение другой ошибки:
> type mismatch; found : org.apache.spark.rdd.RDD[(String, Double)]
> required: org.apache.spark.rdd.RDD[(Any, Double)] Note: (String,
> Double) <: (Any, Double), but class RDD is invariant in type T. You
> may wish to define T as +T instead. (SLS 4.5)
> ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)