Соответствие строк в столбцах PySpark DataFrame - PullRequest
0 голосов
/ 29 сентября 2018

Я пытаюсь стандартизировать имена заголовков моих DataFrames с учетом справочной таблицы.

Моя справочная таблица - это DataFrame с переменными в строках, а также стандартные и все возможные имена вариантов в виде столбцов:

+-------------+---------+---------+
|Standard_name|Variant_1|Variant_2|
+-------------+---------+---------+
|     Pressure|    Press|  Press_1|
|        Speed|   Speed_|     Rate|
+-------------+---------+---------+

Скажем, у меня есть DataFrame данных с этими именами столбцов:

['Pressure', 'Rate', 'Altitude']

Я хочу найти каждое из этих имен переменных в моем ссылочном DataFrame, вернуть соответствующее Standard_name, если оно существует, или сохранить исходную переменную, если на нее еще нет ссылок в таблице.

Таким образом, ожидаемый результат фиктивного примера выше должен быть:

[Pressure, 'Speed', Altitude]

Это легко сделать в обычных Python-пандах, но я понятия не имею, как это сделать в Spark, где вы не должныдумать в терминах индексов строк.

Большое спасибо заранее за помощь.

1 Ответ

0 голосов
/ 05 октября 2018

Хотя я согласен с комментарием mayank agrawal выше, я попытался решить эту проблему в Spark.

Я адаптировал это решение , чтобы извлечь все попарные соответствия каждого варианта со стандартным именем вбольшой словарь.Затем я сопоставил словарь с заголовками набора данных, чтобы создал новый столбец стандартизированных заголовков.

Таким образом, решение:

from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
from itertools import chain


key_value_map = F.udf(lambda maps: {key:f[key] for f in maps for key in f},
    MapType(StringType(),StringType()))


map_df = variable_df
    .agg(F.collect_list(F.create_map(list(chain.from_iterable([[key, 'Standard'] for key in var_df.columns[2:]])))).alias('maps')) 
    .agg(F.collect_list(key_value_map('maps')))

result_dict = map_df.collect()  
ref_dict = result_dict[0][0][0]

corresp_df = header_df
    .withColumn('new_header', F.create_map([F.lit(x) for x in chain(*ref_dict.items())]).getItem(F.col('old_header')))    
    .withColumn("new_header", F.coalesce(F.col('new_header'), F.col('old_header')))

new_columns = corresp_df.select('new_header').rdd.flatMap(lambda row : row).collect()

renamed_df = data_df.toDF(*new_columns)

Ссылки:

Pyspark датафрейма для диктовки

Pyspark создает новый столбец с отображением из dict

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...