Разверните массив структур в столбцы в PySpark - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть фрейм данных Spark, созданный в Google Analytics, который выглядит следующим образом:

id     customDimensions (Array<Struct>)
100    [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101    [ {"index": 1, "value": "Mars" }]

У меня также есть фрейм данных "метаданные пользовательских измерений", который выглядит следующим образом:

index   name
1       planet
2       continent

Я бы использовал индексы в метаданных df для расширения моих пользовательских измерений в столбцы.Результат должен выглядеть следующим образом:

id     planet     continent
100    Earth      Europe
101    Mars       null

Я попробовал следующий подход, и он отлично работает, однако он крайне неэффективен.Я хотел бы знать, есть ли лучший подход.

# Select the two relevant columns
cd = df.select('id', 'customDimensions')

# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
         .join(metadata, cd.index == metadata.index, 'left')
         .drop(metadata.index))

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))

# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)

Допущения:

  • Существует до 250 пользовательских индексов измерений, и имена известны толькочерез фрейм метаданных
  • В оригинальном фрейме данных есть несколько других столбцов, которые я хотел бы сохранить (следовательно, соединение в конце моего решения)

1 Ответ

0 голосов
/ 07 декабря 2018

Объединения являются очень дорогостоящей операцией, потому что это приводит к перестановке данных.Если вы можете, вы должны избегать этого или стремиться оптимизировать его.

В вашем коде есть два объединения.Последнее соединение вернуть столбцы можно вообще избежать.Другое соединение с метаданными может быть оптимизировано.Поскольку метаданные df имеют только 250 строк и очень малы, вы можете использовать подсказку broadcast() в соединении.Это позволило бы избежать перетасовки большого кадра данных.

Я сделал некоторые предлагаемые изменения кода, но он не тестировался, поскольку у меня нет ваших данных.

# df columns list
df_columns = df.columns

# Explode customDimensions so that each row now has a {index, value}
cd = df.withColumn('customDimensions', F.explode(cd.customDimensions))

# Put the index and value into their own columns
cd = cd.select(*df_columns, 'customDimensions.index', 'customDimensions.value')

# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = cd.join(broadcast(metadata), "index", 'left')

# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy(df_columns).pivot('name').agg(F.first(F.col('value')))


return piv
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...