Вращение значений строки в pyspark - PullRequest
0 голосов
/ 06 февраля 2020

В настоящее время я работаю над очисткой набора данных и пытаюсь сделать это с помощью pyspark. Данные считываются в фрейм данных из CSV, и значения, которые мне нужны, находятся в соответствующих строках, но для некоторых строк значения перемешиваются. Мне нужно повернуть значения этих строк, чтобы значения были в правильных столбцах. Например, допустим, у меня есть следующий набор данных:

+-------+-------+-------+
|   A   |   B   |   C   |
+-------+-------+-------+
|   2   |   3   |   1   |
+-------+-------+-------+

Но значения в первой строке должны быть

+-------+-------+-------+
|   A   |   B   |   C   |
+-------+-------+-------+
|   1   |   2   |   3   |
+-------+-------+-------+

Мое текущее решение - добавить временный столбец и переназначить значения каждому столбцу и переименовать временный столбец, удаляя старый:

// Add temporary column C
+-------+-------+-------+-------+
|   A   |   B   |   C   | tmp_C |
+-------+-------+-------+-------+
|   2   |   3   |   1   |   1   |
+-------+-------+-------+-------+
// Shift values
+-------+-------+-------+-------+
|   A   |   B   |   C   | tmp_C |
+-------+-------+-------+-------+
|   2   |   2   |   3   |   1   |
+-------+-------+-------+-------+
// Drop old column
+-------+-------+-------+
|   B   |   C   | tmp_C |
+-------+-------+-------+
|   2   |   3   |   1   |
+-------+-------+-------+
// Rename new column
+-------+-------+-------+
|   B   |   C   |   A   |
+-------+-------+-------+
|   2   |   3   |   1   |
+-------+-------+-------+

Я бы реализовал это в pyspark следующим образом:

from pyspark.sql import SparkSession
from pyspark.sql.function import when, col

def clean_data(spark_session, file_path):
    df = (
        spark_session
        .read
        .csv(file_path, header='true')
    )

    df = (
        df
        .withColumn(
            "tmp_C",
            when(
                col("C") == 1,
                col("C")
            ).otherwise("A")
        )
        .withColumn(
            "C",
            when(
                col("C") == 1,
                col("B")
            ).otherwise("C")
        )
        .withColumn(
            "B",
            when(
                col("C") == 1,
                col("A")
            ).otherwise("B")
        )
    )

    df = df.drop("A")
    df = df.withColumnRenamed("tmp_C", "A")

    return df

Мне это выглядит не слишком красиво, и я не уверен, что это лучший способ решить эту проблему. Я совершенно новичок в Spark и хотел бы узнать лучший подход к этой ситуации, хотя это работает. Кроме того, я также хотел бы знать, является ли это хорошим примером использования Spark (обратите внимание, что набор данных, который я использую, большой, и полей намного больше, чем этот. Приведенный выше пример значительно упрощен).

Ответы [ 2 ]

0 голосов
/ 07 февраля 2020

Что ж, это может быть быстрее, если вы отобразите каждый столбец по порядку вращения в соответствующий столбец.

// generate columns map
maps = dict(zip(['C', 'A', 'B'], ['A', 'B', 'C']))

// regular approach:
// select columns with alias maps
df.select([col(c).alias(maps.get(c, c)) for c in df.columns])

// row scan approach:
// select columns with alias maps that satisfied specific condition
df.select([when(<map-condition>, col(c).alias(maps.get(c, c))).otherwise(col(c)) for c in df.columns])

Надеюсь, это поможет.

0 голосов
/ 07 февраля 2020

Да, если набор данных велик, вам следует использовать Spark.

Возможно, вам было бы лучше просто переименовать столбцы, а не сдвигать фактические данные? Предполагая, что проблема с данными систематизирована c, как в вашем примере. Это немного запутанно из-за проблем с переименованием, основанным на имени столбца, а не на позиции, поэтому вам придется сначала изменить имя на временное.

from functools import reduce

old_cols = df.columns
new_cols = old_cols[1:] + [old_cols[0]]
temp_cols = [col + "_" for col in new_cols]
# Rename columns with temporary names
df_temp = reduce(lambda df, idx: df.withColumnRenamed(old_cols[idx], temp_cols[idx]), range(len(old_cols)), df)
# Rename columns to align with correct data
df = reduce(lambda df_temp, idx: df_temp.withColumnRenamed(temp_cols[idx], new_cols[idx]), range(len(temp_cols)), df_temp)
# Then revert back to original column order
df = df.select(old_cols)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...