Разделение определенного столбца PySpark df и создание другого DF - PullRequest
0 голосов
/ 31 октября 2018

У меня есть фрейм данных (скажем, ac_df), который имеет 32 разных столбца. Я хочу получить определенный столбец и разделить значения на кусок 3 как одно новое значение и создать из него еще одну df.

ac_df['payment_history_1'] дает результат ниже

enter image description here

Я хочу новый df, который имеет структуру ниже.

enter image description here

Например: если я возьму первый ряд '000000000000', он будет сгруппирован как

'000', '000', '000', '000'

и это создает первый ряд нового df.

Python-эквивалентный код для выполнения этой задачи приведен ниже:

temp1 = ac_df['payment_history_1'].str.split(r'(...)', expand=True)

В спарк я попробовал что-то ниже:

temp1 = ac_df.select(ac_df['payment_history_1']).rdd.map(lambda each_row: str(each_row[0])).map(lambda y: y.split(r'(...)')).collect()

Выход:

 [['000000000000'], ['000000000003000000000'], ['000000000003000000000000000']]

Однако я не могу двигаться вперед и получить желаемый результат. Кто-нибудь может подсказать?

1 Ответ

0 голосов
/ 02 ноября 2018

Попробуйте, вы сможете построить над этим:

df = spark.createDataFrame(
        [
            [1, '000000000000'], 
            [2, '000000000003000000000'], 
            [3, '000000000003000000000000000']
        ]
        , ["id", "numbers"]
        )


df.show()

Должно привести к чему-то похожему на тот кадр данных, с которого вы начинаете:

+---+--------------------+
| id|             numbers|
+---+--------------------+
|  1|        000000000000|
|  2|00000000000300000...|
|  3|00000000000300000...|
+---+--------------------+

взяв столбец чисел, вы сможете разобрать его в разделенную "," строку, откуда мы можем применить: posexplode (expr) - Разделяет элементы массива expr на несколько строк с позициями, или элементы карты expr в несколько строк и столбцов с позициями.

df.select(
    "id",
    f.split("numbers", ",").alias("numbers"),
    f.posexplode(f.split("numbers", ",")).alias("pos", "val")
).show()

, что должно привести к:

+---+--------------------+---+---+
| id|             numbers|pos|val|
+---+--------------------+---+---+
|  1|[000, 000, 000, 000]|  0|000|
|  1|[000, 000, 000, 000]|  1|000|
|  1|[000, 000, 000, 000]|  2|000|
|  1|[000, 000, 000, 000]|  3|000|
|  2|[000, 000, 000, 0...|  0|000|
|  2|[000, 000, 000, 0...|  1|000|
|  2|[000, 000, 000, 0...|  2|000|
|  2|[000, 000, 000, 0...|  3|003|
|  2|[000, 000, 000, 0...|  4|000|
|  2|[000, 000, 000, 0...|  5|000|
|  2|[000, 000, 000, 0...|  6|000|
|  3|[000, 000, 000, 0...|  0|000|
|  3|[000, 000, 000, 0...|  1|000|
|  3|[000, 000, 000, 0...|  2|000|
|  3|[000, 000, 000, 0...|  3|003|
|  3|[000, 000, 000, 0...|  4|000|
|  3|[000, 000, 000, 0...|  5|000|
|  3|[000, 000, 000, 0...|  6|000|
|  3|[000, 000, 000, 0...|  7|000|
|  3|[000, 000, 000, 0...|  8|000|
+---+--------------------+---+---+

Далее мы используем: pyspark.sql.functions.expr, чтобы получить элемент с индексом pos в этом массиве.

Первый - это имя нашего нового столбца, который будет объединением числа и индекса в массиве. Второй столбец будет значением соответствующего индекса в массиве. Мы получаем последнее, используя функциональность pyspark.sql.functions.expr, которая позволяет нам использовать значения столбцов в качестве параметров.

df.select(
    "id",
    f.split("numbers", ",").alias("numbers"),
    f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
    "id",
    f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
    f.expr("numbers[pos]").alias("val")
)\
.show()

что дает:

+---+--------+---+
| id|  number|val|
+---+--------+---+
|  1|numbers0|000|
|  1|numbers1|000|
|  1|numbers2|000|
|  1|numbers3|000|
|  2|numbers0|000|
|  2|numbers1|000|
|  2|numbers2|000|
|  2|numbers3|003|
|  2|numbers4|000|
|  2|numbers5|000|
|  2|numbers6|000|
|  3|numbers0|000|
|  3|numbers1|000|
|  3|numbers2|000|
|  3|numbers3|003|
|  3|numbers4|000|
|  3|numbers5|000|
|  3|numbers6|000|
|  3|numbers7|000|
|  3|numbers8|000|
+---+--------+---+

Наконец, мы можем просто сгруппировать идентификатор и развернуть DataFrame

df.select(
    "id",
    f.split("numbers", ",").alias("numbers"),
    f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
    "id",
    f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
    f.expr("numbers[pos]").alias("val")
)\
.groupBy("id").pivot("number").agg(f.first("val"))\
.show()

с указанием окончательного фрейма данных:

enter image description here

забрал детали у: Разделить столбец строки Spark Dataframe на несколько столбцов

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...