У меня есть rdd вроде следующего:
[{'age': 2.18430371791803,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{'age': 2.80033330216659,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{'age': 2.8222365762732,
'code': u'"315.320000"',
'id': u'"00008RINR"'},
{...}]
Я пытаюсь уменьшить каждый идентификатор до 1 записи, взяв код с самой высокой частотой, используя такой код:
rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])
Есть одна проблема с этой реализацией, она не учитывает возраст, поэтому, если, например, один идентификатор имеет несколько кодов с частотой 2, потребуется последний код.
Для иллюстрации этой проблемы, пожалуйста, рассмотрите это уменьшенный идентификатор:
(u'"000PZ7S2G"',
[(4.3218651186303, u'"388.400000"'),
(4.34924421126357, u'"388.400000"'),
(4.3218651186303, u'"389.900000"'),
(4.34924421126357, u'"389.900000"'),
(13.3667102491139, u'"794.310000"'),
(5.99897016368982, u'"995.300000"'),
(6.02634923989903, u'"995.300000"'),
(4.3218651186303, u'"V72.19"'),
(4.34924421126357, u'"V72.19"'),
(13.3639723398581, u'"V81.2"'),
(13.3667102491139, u'"V81.2"')])
мой код будет выводить:
[(2, u'"V81.2"')]
, когда я хотел бы вывести:
[(2, u'"388.400000"')]
, потому что, хотя частота то же самое для обоих этих кодов, код 388.400000 имеет меньший возраст и появляется первым.
, добавив эту строку после .reduceByKey ():
.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))
Я могу отфильтровать Из тех, у кого минимальный возраст больше, но тогда я рассматриваю только тех, у кого минимальный возраст, а не все коды для расчета их частоты. Я не могу применить те же / похожие логи c после [max (zip ((x.count (i) для i в set (x)), set (x)))], поскольку set (x) является набор х [1], который не учитывает возраст.
Я должен добавить, я не хочу просто брать первый код с самой высокой частотой, я хотел бы взять самую высокую частоту код с наименьшим возрастом или код, который появляется первым, если это возможно, используя только действия rdd.
эквивалентный код в SQL того, что я пытаюсь получить, будет что-то вроде:
SELECT code, count(*) as code_frequency
FROM (SELECT id, code, age
FROM (SELECT id, code, MIN(age) AS age, COUNT(*) as cnt,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY COUNT(*) DESC, MIN(age)) as seqnum
FROM tbl
GROUP BY id, code
) t
WHERE seqnum = 1) a
GROUP BY code
ORDER by code_frequency DESC
LIMIT 5;
и как DF (хотя и стараюсь избегать этого):
wc = Window().partitionBy("id", "code").orderBy("age")
wc2 = Window().partitionBy("id")
df = rdd.toDF()
df = df.withColumn("count", F.count("code").over(wc))\
.withColumn("max", F.max("count").over(wc2))\
.filter("count = max")\
.groupBy("id").agg(F.first("age").alias("age"),
F.first("code").alias("code"))\
.orderBy("id")\
.groupBy("code")\
.count()\
.orderBy("count", ascending = False)
Буду очень признателен за любую помощь с этим.