встретили некоторую путаницу по поводу фильтра.Существует RDD val имен слов следующим образом:
Array(Array('1239423', '42132'), Array('245123', '32412'), ...)
Первый элемент - это идентификатор пользователя, а второй - идентификатор элемента.
И массив действительных идентификаторов элементов в другом значении val trainitemids_value, как показано ниже:
Array('42132', '43123', ...)
Я хочу использовать этот действительный идентификатор, чтобы применить фильтр к словам.Как я понимаю, количество выходных следующих двух методов должно быть одинаковым:
val ids = words.map(line => line(1))
val re = ids.filter(line => trainitemids_value.contains(line))\
или
val re = words.filter(line => trainitemids_value.contains(line(1)))
Но на самом деле все по-другому.Номер метода 1 имеет смысл, потому что он меньше, чем количество оригинальных слов.Результат метода 2 имеет гораздо большее число по сравнению с оригинальными словами.
Я не понимаю, почему число выходных данных фильтра может быть больше, чем у оригинальных коллекций?
Ниже приводится оригиналвывод из моей консоли:
scala> val ids = words.map(line => line(1))
ids: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:28
scala> val re = ids.filter(line => validID.contains(line))
re: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at filter at <console>:42
scala> re.count()
res4: Long = 42548
scala> val re2 = words.filter(line => validID.contains(line(1)))
re2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[16] at filter at <console>:40
scala> re2.count()
res5: Long = 2448569
scala> words.count()
res6: Long = 42549
Согласно ответу @vindev, я попытался кешировать слова RDD.Результат выглядит разумным сейчас.Я до сих пор не до конца понимаю причину.Ниже приводится решение:
scala> val cached = words.cache
cached: words.type = MapPartitionsRDD[13] at map at <console>:26
scala> cached.count()
res7: Long = 42549`
scala> val re3 = cached.filter(line => validID.contains(line(1)))
re3: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at filter at <console>:42
scala> re3.count()
res8: Long = 42548