Мы пытаемся перенести базу кода vanilla python в pyspark.Задача состоит в том, чтобы выполнить некоторую фильтрацию на фрейме данных (ранее pandas, теперь - spark), затем сгруппировать его по идентификаторам пользователей и, наконец, применить кластеризацию meanshift сверху.
Я использую pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
для сгруппированных данных.Но теперь есть проблема в том, как должен быть представлен конечный результат.
Допустим, у нас есть два столбца во входном фрейме данных, user-id
и location
.Для каждого пользователя нам нужно получить все кластеры (на location
), сохранить только самый большой и затем вернуть его атрибуты, которые являются трехмерным вектором.Давайте предположим, что столбцы 3-кортежа - col-1
, col-2
и col-3
.Я могу думать только о создании исходного кадра данных с 5 столбцами, с этими 3 полями, установленными на None
, используя что-то вроде withColumn('col-i', lit(None).astype(FloatType()))
.Затем в первой строке для каждого пользователя я планирую заполнить эти три столбца этими атрибутами.Но это кажется действительно уродливым способом, и это излишне тратит много места, потому что кроме первой строки все записи в col-1
, col-2
и col-3
будут равны нулю.В этом случае выходной кадр данных будет выглядеть примерно так:
+---------+----------+-------+-------+-------+
| user-id | location | col-1 | col-2 | col-3 |
+---------+----------+-------+-------+-------+
| 02751a9 | 0.894956 | 21.9 | 31.5 | 54.1 |
| 02751a9 | 0.811956 | null | null | null |
| 02751a9 | 0.954956 | null | null | null |
| ... |
| 02751a9 | 0.811956 | null | null | null |
+--------------------------------------------+
| 0af2204 | 0.938011 | 11.1 | 12.3 | 53.3 |
| 0af2204 | 0.878081 | null | null | null |
| 0af2204 | 0.933054 | null | null | null |
| 0af2204 | 0.921342 | null | null | null |
| ... |
| 0af2204 | 0.978081 | null | null | null |
+--------------------------------------------+
Это выглядит так неправильно.Есть ли элегантный способ сделать это?