CombineByKey не позволяет работать с flatMapValues ​​для получения пар (Key, Value) - PullRequest
0 голосов
/ 05 июня 2019

Я работаю над проблемой MapReduce, в которой я хочу отфильтровать каждый вывод раздела Map. Я хочу отфильтровать только те ключи, которые встречаются больше порогового значения в их разделе карты.

Итак, у меня есть СДР, как (key, value<tuple>)

Для каждого значения кортежа я хочу получить количество его вхождений в СДР, разделенное на разделы карты. Тогда я отфильтрую этот счет.

Eg: RDD: {(key1, ("a","b","c")), 
          (key2, ("a","d"), 
          (key3, ("b","c")}

Используя flatMapValues, я смог уменьшить это как

{(key1, a), (key1, b), (key1, c), (key2, a), (key2, d), (key3, b), (key3, c)}

Теперь, используя шаг combineByKey, я смог получить счетчик каждого значения в соответствующем разделе.

предположим, что было два раздела, тогда он вернется как

("a", [1, 1]), ("b", [1,1]), ("c", [1,1]), ("d", 1)

Теперь я хочу отфильтровать это (ключ, значение) так, чтобы кортеж значений составлял отдельную пару ключ-значение, то есть то, что flatMapValues ​​делало для меня раньше, но я не могу использовать flatMapValues ​​

python

from itertools import count

import pyspark
import sys
import json
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.rdd import RDD

sc = SparkContext('local[*]', 'assignment2_task1')

RDD = sc.textFile(sys.argv[1])

rdd1_original = RDD.map(lambda x: x.split(",")).map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x + y)


rdd3_candidate = rdd1_original.flatMapValues(lambda a: a).map(lambda x: (x[1], 1)).combineByKey(lambda value: (value),lambda x, y: (x + y),                                                                                              lambda x, y: (x,y))

new_rdd = rdd3_candidate.flatMapValues(lambda a:a)
print(new_rdd.collect())

Ожидаемый ответ:

[("a",1),("a", 1), ("b", 1), ("b", 1), ("c", 1), ("c", 1), ("d", 1)

Текущая ошибка:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
    process()
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/yashphogat/Downloads/spark-2.33-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "/Users/yashphogat/Python_Programs/lib/python3.6/site-packages/pyspark/rdd.py", line 1967, in <lambda>
    <b>flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
TypeError: 'int' object is not iterable
...