Не упоминается, если необходимы преобразования RDD
. Итак, я собираюсь go вперед и дам ответ с DataFrame
преобразованиями. Результирующий DataFrame
может быть преобразован в RDD
при необходимости (указан внизу).
Используя подмножество ваших данных
data = [
[1,'vikram','Technology',30000],
[2,'vivek','Sales',20000],
[1,'mushahid','Sales',10000]
]
data_sdf = spark.sparkContext.parallelize(data).toDF(['id', 'name', 'foo', 'bar'])
# +---+--------+----------+-----+
# | id| name| foo| bar|
# +---+--------+----------+-----+
# | 1| vikram|Technology|30000|
# | 2| vivek| Sales|20000|
# | 1|mushahid| Sales|10000|
# +---+--------+----------+-----+
# count the "id" occurrances
id_counts = data_sdf.groupBy('id'). \
agg(func.count('*').alias('id_cnt')). \
filter(func.col('id_cnt') > 1)
dup_ids = [k.id for k in id_counts.collect()]
# [1]
# data with duplicate ids
dup_id_data_sdf = data_sdf. \
filter(func.col('id').isin(dup_ids))
# +---+--------+----------+-----+
# | id| name| foo| bar|
# +---+--------+----------+-----+
# | 1| vikram|Technology|30000|
# | 1|mushahid| Sales|10000|
# +---+--------+----------+-----+
# data with no duplicate ids
nodup_id_data_sdf = data_sdf. \
filter(func.col('id').isin(dup_ids) == False)
# +---+-----+-----+-----+
# | id| name| foo| bar|
# +---+-----+-----+-----+
# | 2|vivek|Sales|20000|
# +---+-----+-----+-----+
Для преобразования в RDD
dup_id_data_rdd = dup_id_data_sdf.rdd
nodup_id_data_rdd = nodup_id_data_sdf.rdd