В настоящее время я работаю над очисткой набора данных и пытаюсь сделать это с помощью pyspark. Данные считываются в фрейм данных из CSV, и значения, которые мне нужны, находятся в соответствующих строках, но для некоторых строк значения перемешиваются. Мне нужно повернуть значения этих строк, чтобы значения были в правильных столбцах. Например, допустим, у меня есть следующий набор данных:
+-------+-------+-------+
| A | B | C |
+-------+-------+-------+
| 2 | 3 | 1 |
+-------+-------+-------+
Но значения в первой строке должны быть
+-------+-------+-------+
| A | B | C |
+-------+-------+-------+
| 1 | 2 | 3 |
+-------+-------+-------+
Мое текущее решение - добавить временный столбец и переназначить значения каждому столбцу и переименовать временный столбец, удаляя старый:
// Add temporary column C
+-------+-------+-------+-------+
| A | B | C | tmp_C |
+-------+-------+-------+-------+
| 2 | 3 | 1 | 1 |
+-------+-------+-------+-------+
// Shift values
+-------+-------+-------+-------+
| A | B | C | tmp_C |
+-------+-------+-------+-------+
| 2 | 2 | 3 | 1 |
+-------+-------+-------+-------+
// Drop old column
+-------+-------+-------+
| B | C | tmp_C |
+-------+-------+-------+
| 2 | 3 | 1 |
+-------+-------+-------+
// Rename new column
+-------+-------+-------+
| B | C | A |
+-------+-------+-------+
| 2 | 3 | 1 |
+-------+-------+-------+
Я бы реализовал это в pyspark следующим образом:
from pyspark.sql import SparkSession
from pyspark.sql.function import when, col
def clean_data(spark_session, file_path):
df = (
spark_session
.read
.csv(file_path, header='true')
)
df = (
df
.withColumn(
"tmp_C",
when(
col("C") == 1,
col("C")
).otherwise("A")
)
.withColumn(
"C",
when(
col("C") == 1,
col("B")
).otherwise("C")
)
.withColumn(
"B",
when(
col("C") == 1,
col("A")
).otherwise("B")
)
)
df = df.drop("A")
df = df.withColumnRenamed("tmp_C", "A")
return df
Мне это выглядит не слишком красиво, и я не уверен, что это лучший способ решить эту проблему. Я совершенно новичок в Spark и хотел бы узнать лучший подход к этой ситуации, хотя это работает. Кроме того, я также хотел бы знать, является ли это хорошим примером использования Spark (обратите внимание, что набор данных, который я использую, большой, и полей намного больше, чем этот. Приведенный выше пример значительно упрощен).