pyspark rdd принимает максимальную частоту с наименьшим возрастом - PullRequest
2 голосов
/ 25 марта 2020

У меня есть rdd вроде следующего:

[{'age': 2.18430371791803,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.80033330216659,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.8222365762732,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {...}]

Я пытаюсь уменьшить каждый идентификатор до 1 записи, взяв код с самой высокой частотой, используя такой код:

rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

Есть одна проблема с этой реализацией, она не учитывает возраст, поэтому, если, например, один идентификатор имеет несколько кодов с частотой 2, потребуется последний код.

Для иллюстрации этой проблемы, пожалуйста, рассмотрите это уменьшенный идентификатор:

(u'"000PZ7S2G"',
 [(4.3218651186303, u'"388.400000"'),
  (4.34924421126357, u'"388.400000"'),
  (4.3218651186303, u'"389.900000"'),
  (4.34924421126357, u'"389.900000"'),
  (13.3667102491139, u'"794.310000"'),
  (5.99897016368982, u'"995.300000"'),
  (6.02634923989903, u'"995.300000"'),
  (4.3218651186303, u'"V72.19"'),
  (4.34924421126357, u'"V72.19"'),
  (13.3639723398581, u'"V81.2"'),
  (13.3667102491139, u'"V81.2"')])

мой код будет выводить:

[(2, u'"V81.2"')]

, когда я хотел бы вывести:

[(2, u'"388.400000"')]

, потому что, хотя частота то же самое для обоих этих кодов, код 388.400000 имеет меньший возраст и появляется первым.

, добавив эту строку после .reduceByKey ():

.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

Я могу отфильтровать Из тех, у кого минимальный возраст больше, но тогда я рассматриваю только тех, у кого минимальный возраст, а не все коды для расчета их частоты. Я не могу применить те же / похожие логи c после [max (zip ((x.count (i) для i в set (x)), set (x)))], поскольку set (x) является набор х [1], который не учитывает возраст.

Я должен добавить, я не хочу просто брать первый код с самой высокой частотой, я хотел бы взять самую высокую частоту код с наименьшим возрастом или код, который появляется первым, если это возможно, используя только действия rdd.

эквивалентный код в SQL того, что я пытаюсь получить, будет что-то вроде:

SELECT code, count(*) as code_frequency
FROM (SELECT id, code, age
FROM (SELECT id, code, MIN(age) AS age, COUNT(*) as cnt,
             ROW_NUMBER() OVER (PARTITION BY id ORDER BY COUNT(*) DESC, MIN(age)) as seqnum
      FROM tbl
      GROUP BY id, code
     ) t
WHERE seqnum = 1) a
GROUP BY code
ORDER by code_frequency DESC
LIMIT 5;

и как DF (хотя и стараюсь избегать этого):

wc = Window().partitionBy("id", "code").orderBy("age")
wc2 = Window().partitionBy("id")
df = rdd.toDF()
df = df.withColumn("count", F.count("code").over(wc))\
.withColumn("max", F.max("count").over(wc2))\
.filter("count = max")\
.groupBy("id").agg(F.first("age").alias("age"),
                           F.first("code").alias("code"))\
.orderBy("id")\
.groupBy("code")\
.count()\
.orderBy("count", ascending = False)

Буду очень признателен за любую помощь с этим.

Ответы [ 2 ]

1 голос
/ 13 апреля 2020

Исходя из SQL эквивалента вашего кода, я преобразовал лог c в следующий rdd1 плюс некоторую постобработку (начиная с исходного СДР):

rdd = sc.parallelize([{'age': 4.3218651186303, 'code': '"388.400000"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"388.400000"', 'id': '"000PZ7S2G"'},
 {'age': 4.3218651186303, 'code': '"389.900000"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"389.900000"', 'id': '"000PZ7S2G"'},
 {'age': 13.3667102491139, 'code': '"794.310000"', 'id': '"000PZ7S2G"'},
 {'age': 5.99897016368982, 'code': '"995.300000"', 'id': '"000PZ7S2G"'},
 {'age': 6.02634923989903, 'code': '"995.300000"', 'id': '"000PZ7S2G"'},
 {'age': 4.3218651186303, 'code': '"V72.19"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"V72.19"', 'id': '"000PZ7S2G"'},
 {'age': 13.3639723398581, 'code': '"V81.2"', 'id': '"000PZ7S2G"'},
 {'age': 13.3667102491139, 'code': '"V81.2"', 'id': '"000PZ7S2G"'}])

rdd1 = rdd.map(lambda x: ((x['id'], x['code']),(x['age'], 1))) \
    .reduceByKey(lambda x,y: (min(x[0],y[0]), x[1]+y[1])) \
    .map(lambda x: (x[0][0], (-x[1][1] ,x[1][0], x[0][1]))) \
    .reduceByKey(lambda x,y: x if x < y else y) 
# [('"000PZ7S2G"', (-2, 4.3218651186303, '"388.400000"'))]

Где:

  1. использовать map для инициализации пары-RDD с ключом = (x['id'], x['code']), значение = (x['age'], 1)
  2. использовать reduceByKey для расчета min_age и count
  3. используйте map для сброса парного СДР с ключом = id и значением = (-count, min_age, code)
  4. используйте reduceByKey для найдите минимальное значение кортежей (-count, min_age, code) для того же id

Вышеуказанные шаги аналогичны следующим:

  • Шаг (1) + (2): groupby('id', 'code').agg(min('age'), count())
  • Шаг (3) + (4): groupby('id').agg(min(struct(negative('count'),'min_age','code')))

Затем вы можете получить производную таблицу a в вашем SQL, выполнив rdd1.map(lambda x: (x[0], x[1][2], x[1][1])), но этот шаг не является необходимым. code может быть посчитан непосредственно из вышеупомянутого rdd1 с помощью другой функции карты + метода countByKey () и затем отсортирован результат:

sorted(rdd1.map(lambda x: (x[1][2],1)).countByKey().items(), key=lambda y: -y[1])
# [('"388.400000"', 1)]

Однако, если вы ищете сумму (количество) через все id с, затем выполните следующее:

rdd1.map(lambda x: (x[1][2],-x[1][0])).reduceByKey(lambda x,y: x+y).collect()
# [('"388.400000"', 2)]
0 голосов
/ 28 марта 2020

Если вариант преобразования rdd в массив данных является опцией, я думаю, что этот подход может решить вашу проблему:

from pyspark.sql.functions import row_number, col
from pyspark.sql import Window
df = rdd.toDF()
w = Window.partitionBy('id').orderBy('age')
df = df.withColumn('row_number', row_number.over(w)).where(col('row_number') == 1).drop('row_number')
...