Как передать несколько столбцов в качестве параметров в pyspark write repartition () - PullRequest
2 голосов
/ 22 октября 2019

Я создал функцию, которая записывает фрейм данных в местоположение s3 с некоторыми параметрами. Все работает, за исключением параметра переразметки списка. Сбой с ошибкой: raise TypeError("numPartitions should be an int or Column")

Год - это столбец типа int, а дата - столбец типа sate. Когда я жестко кодирую 2 столбца как .repartition("parti_create_date", "parti_hour"), это работает. Я попытался предоставить их в виде списка, строки и столбца. Кажется, ничего не работает.

parti_list = ["parti_year", "parti_create_date", "parti_hour"]
re_parti_list = ["parti_create_date", "parti_hour"]

def spark_write(in_df, write_tgt_loc, parti_list, re_parti_list, tgt_file_format, write_mode, tgt_file_compression):
(in_df
            .repartition(re_parti_list)  #(re_parti_str)
            .write
            .partitionBy(parti_str)
            .mode(write_mode).format(tgt_file_format)
            .option('compression', tgt_file_compression).option("nullValue", "null").option("treatEmptyValuesAsNulls,", "true")
            .save(write_tgt_loc))

spark_write(tgt_df, "s3://bucket/out/", parti_list, re_parti_list, "parquet", "overwrite","snappy")

Не могли бы вы помочь мне разобраться, как передать столбцы повторного разбиения в качестве параметров в PySpark?

1 Ответ

2 голосов
/ 22 октября 2019

Перераспределение ожидает either int or column, поэтому нам нужно передать col("<col_name>") на фрейм данных.

Example:

df=spark.createDataFrame([(1,'a',),(2,'b',),(3,'c',)],['id','name'])
df.rdd.getNumPartitions() #number of partitions in df
1

Repartition on int:

df.repartition(10).rdd.getNumPartitions() #repartition to 10 

10

Repartition on columns:

df.repartition(col("id"),col("name")).rdd.getNumPartitions() #repartition on columns

200

Dynamic repartition on columns:

df.repartition(*[col(c) for c in df.columns]).rdd.getNumPartitions()

200

картазатем введите columns list - column вместо string, а затем передайте имена столбцов в repartition.

For your case try this way:

df.repartition(*[col(c) for c in re_parti_list])
            .write
            .partitionBy(parti_str)
            .mode(write_mode).format(tgt_file_format)
            .option('compression', tgt_file_compression).option("nullValue", "null").option("treatEmptyValuesAsNulls,", "true")
            .save(write_tgt_loc))

In scala:

df.repartition(df.columns.map(c => col(c)):_*).rdd.getNumPartitions
200
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...