Как правильно использовать уменьшить со словарем - PullRequest
0 голосов
/ 05 февраля 2020

Я использую пользовательскую функцию как часть операции уменьшения. В следующем примере я получаю следующее сообщение TypeError: reduce() takes no keyword arguments - я полагаю, это связано с тем, как я использую словарь mapping в функции exposed_colum - Не могли бы вы помочь мне исправить эту функцию?

from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from functools import reduce


def process_data(df: DataFrame):
    col_mapping = dict(zip(["name", "age"], ["a", "b"]))

    # Do other things...

    def exposed_column(df: DataFrame, mapping: dict):
        return df.select([col(c).alias(mapping.get(c, c)) for c in df.columns])

    return reduce(exposed_column, sequence=col_mapping, initial=df)


spark = SparkSession.builder.appName("app").getOrCreate()
l = [
    ("Bob", 25, "Spain"),
    ("Marc", 22, "France"),
    ("Steve", 20, "Belgium"),
    ("Donald", 26, "USA"),
]
rdd = spark.sparkContext.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]), country=x[2])).toDF()

people.show()
process_data(people).show()

people.show() выглядит так

+---+-------+------+
|age|country|  name|
+---+-------+------+
| 25|  Spain|   Bob|
| 22| France|  Marc|
| 20|Belgium| Steve|
| 26|    USA|Donald|
+---+-------+------+

И это ожидаемый результат

+------+---+
|     a|  b|
+------+---+
|   Bob| 25|
|  Marc| 22|
| Steve| 20|
|Donald| 26|
+------+---+

1 Ответ

1 голос
/ 05 февраля 2020

reduce не принимает ключевые слова, это правда. После удаления ключевых слов вы заметите более серьезную проблему: когда вы перебираете словарь, вы перебираете только его ключи. Таким образом, функция, в которой вы пытаетесь переименовать столбцы в пакетном режиме, не будет делать то, что вы имели в виду.

Один из способов переименовать столбец в пакетном режиме - это выполнить итерацию по словарю items:

from typing import Mapping
from pyspark.sql import DataFrame

def rename_columns(frame: DataFrame, mapping: Mapping[str, str]) -> DataFrame:
    return reduce(lambda f, old_new: f.withColumnRenamed(old_new[0], old_new[1]),
                  mapping.items(), frame)

Это позволяет вам передавать в словарь (обратите внимание, что рекомендация для добавления подсказок типа к аргументам заключается в использовании Mapping, а не dict), который отображает имена столбцов на другие имена. К счастью, withColumnRenamed не будет жаловаться, если вы попытаетесь переименовать столбец, которого нет в DataFrame, так что это эквивалентно вашему mapping.get(c, c).

Одна вещь, которую я не замечаю в Ваш код состоит в том, что он отбрасывает столбец country. Так что это все еще будет в вашем выводе.

...