Я работаю над проблемой MapReduce, в которой я хочу отфильтровать каждый вывод раздела Map. Я хочу отфильтровать только те ключи, которые встречаются больше порогового значения в их разделе карты.
Итак, у меня есть СДР, как (key, value<tuple>)
Для каждого значения кортежа я хочу получить количество его вхождений в СДР, разделенное на разделы карты. Тогда я отфильтрую этот счет.
Eg: RDD: {(key1, ("a","b","c")),
(key2, ("a","d"),
(key3, ("b","c")}
Используя flatMapValues, я смог уменьшить это как
{(key1, a), (key1, b), (key1, c), (key2, a), (key2, d), (key3, b), (key3, c)}
Теперь, используя шаг combineByKey
, я смог получить счетчик каждого значения в соответствующем разделе.
предположим, что было два раздела, тогда он вернется как
("a", [1, 1]), ("b", [1,1]), ("c", [1,1]), ("d", 1)
Теперь я хочу отфильтровать это (ключ, значение) так, чтобы кортеж значений составлял отдельную пару ключ-значение, то есть то, что flatMapValues делало для меня раньше, но я не могу использовать flatMapValues
python
from itertools import count
import pyspark
import sys
import json
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.rdd import RDD
sc = SparkContext('local[*]', 'assignment2_task1')
RDD = sc.textFile(sys.argv[1])
rdd1_original = RDD.map(lambda x: x.split(",")).map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x + y)
rdd3_candidate = rdd1_original.flatMapValues(lambda a: a).map(lambda x: (x[1], 1)).combineByKey(lambda value: (value),lambda x, y: (x + y), lambda x, y: (x,y))
new_rdd = rdd3_candidate.flatMapValues(lambda a:a)
print(new_rdd.collect())
Ожидаемый ответ:
[("a",1),("a", 1), ("b", 1), ("b", 1), ("c", 1), ("c", 1), ("d", 1)
Текущая ошибка:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
process()
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
return f(*args, **kwargs)
File "/Users/yashphogat/Python_Programs/lib/python3.6/site-packages/pyspark/rdd.py", line 1967, in <lambda>
<b>flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
TypeError: 'int' object is not iterable