Использование когда и иначе при преобразовании логических значений в строки в Pyspark - PullRequest
0 голосов
/ 02 июля 2018

У меня есть фрейм данных в Pyspark

df.show()


+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|      Y|      0.05|   10| false|
|  3| Ian|      N|      0.01|    1| false|
|  4| Jim|      N|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

Схема ниже:

DataFrame[id: int, name: string, testing: string, avg_result: string, score: string, active: boolean]

Я хочу преобразовать Y в True, N в False true в True и false в False.

Когда я делаю, как показано ниже:

for col in cols:
    df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').
                       when(f.col(col) == 'true', True).when(f.col(col) == 'false', False).otherwise(f.col(col)))

Я получаю ошибку ниже, и в кадре данных нет изменений

pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN (testing = N) THEN False WHEN (testing = Y) THEN True WHEN (testing = true) THEN true WHEN (testing = false) THEN false ELSE testing' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"

+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|      Y|      0.05|   10| false|
|  3| Ian|      N|      0.01|    1| false|
|  4| Jim|      N|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

Когда мне нравится, как показано ниже

for col in cols:
    df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').otherwise(f.col(col)))

Я получаю ошибку ниже

pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN if ((isnull(active) || isnull(cast(N as double)))) null else CASE cast(cast(N as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False WHEN if ((isnull(active) || isnull(cast(Y as double)))) null else CASE cast(cast(Y as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True ELSE active' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"

Но кадр данных меняется на

+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|   True|      0.05|   10| false|
|  3| Ian|  False|      0.01|    1| false|
|  4| Jim|  False|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

New attempt

for col in cols:
    df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').
                       when(f.col(col) == 'true', 'True').when(f.col(col) == 'false', 'False').otherwise(f.col(col)))

Error received

pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN if ((isnull(active) || isnull(cast(N as double)))) null else CASE cast(cast(N as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False WHEN if ((isnull(active) || isnull(cast(Y as double)))) null else CASE cast(cast(Y as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True WHEN if ((isnull(active) || isnull(cast(true as double)))) null else CASE cast(cast(true as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True WHEN if ((isnull(active) || isnull(cast(false as double)))) null else CASE cast(cast(false as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False ELSE active' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"

Как я могу получить фрейм данных как

+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  True|
|  2| Ram|   True|      0.05|   10| False|
|  3| Ian|  False|      0.01|    1| False|
|  4| Jim|  False|       1.2|    3|  True|
+---+----+-------+----------+-----+------+

Ответы [ 2 ]

0 голосов
/ 03 июля 2018

Как я уже упоминал в комментариях, проблема заключается в несоответствии типов. Вам нужно преобразовать логический столбец в строку перед выполнением сравнения. Наконец, вам также необходимо привести столбец к строке в otherwise() (в столбце нельзя использовать смешанные типы).

Ваш код легко изменить, чтобы получить правильный вывод:

import pyspark.sql.functions as f

cols = ["testing", "active"]
for col in cols:
    df = df.withColumn(
        col, 
        f.when(
            f.col(col) == 'N',
            'False'
        ).when(
            f.col(col) == 'Y',
            'True'
        ).when(
            f.col(col).cast('string') == 'true',
            'True'
        ).when(
            f.col(col).cast('string') == 'false',
            'False'
        ).otherwise(f.col(col).cast('string'))
    )
df.show()
#+---+----+-------+----------+-----+------+
#| id|name|testing|avg_result|score|active|
#+---+----+-------+----------+-----+------+
#|  1| sam|   null|      null| null|  True|
#|  2| Ram|   True|      0.05|   10| False|
#|  3| Ian|  False|      0.01|    1| False|
#|  4| Jim|  False|       1.2|    3|  True|
#+---+----+-------+----------+-----+------+

Однако есть и альтернативные подходы. Например, это хорошее место для использования pyspark.sql.Column.isin():

df = reduce(
    lambda df, col: df.withColumn(
        col, 
        f.when(
            f.col(col).cast('string').isin(['N', 'false']),
            'False'
        ).when(
            f.col(col).cast('string').isin(['Y', 'true']),
            'True'
        ).otherwise(f.col(col).cast('string'))
    ),
    cols,
    df
)
df.show()
#+---+----+-------+----------+-----+------+
#| id|name|testing|avg_result|score|active|
#+---+----+-------+----------+-----+------+
#|  1| sam|   null|      null| null|  True|
#|  2| Ram|   True|      0.05|   10| False|
#|  3| Ian|  False|      0.01|    1| False|
#|  4| Jim|  False|       1.2|    3|  True|
#+---+----+-------+----------+-----+------+

(Здесь я использовал reduce для устранения цикла for, но вы могли бы его сохранить.)

Вы также можете использовать pyspark.sql.DataFrame.replace(), но сначала вам нужно будет преобразовать активный столбец в строку:

df = df.withColumn('active', f.col('active').cast('string'))\
    .replace(['Y', 'true',], 'True', subset=cols)\
    .replace(['N', 'false'], 'False', subset=cols)\
df.show()
# results omitted, but it's the same as above

Или используя replace только один раз:

df = df.withColumn('active', f.col('active').cast('string'))\
    .replace(['Y', 'true', 'N', 'false'], ['True', 'True', 'False', 'False'], subset=cols)
0 голосов
/ 02 июля 2018

При рассмотрении схемы и примененных преобразований наблюдается несоответствие типов между возвращаемым String и Boolean. Например. 'N' возвращается как 'False' (строка), а 'false' возвращается как False (логическое значение)

Вы можете преобразовать преобразованные столбцы в строку, чтобы преобразовать Y в True, N в False, true в True и false в False.

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as f

data = [
  (1, "sam", None, None, None, True),
  (2, "Ram", "Y", 0.05, 10, False),
  (3, "Ian", "N", 0.01, 1, False),
  (4, "Jim", "N", 1.2, 3, True)
  ]

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("testing", StringType(), True),
  StructField("avg_result", StringType(), True),
  StructField("score", StringType(), True),
  StructField("active", BooleanType(), True)
  ])

df = sc.parallelize(data).toDF(schema)

Перед применением преобразований

>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- testing: string (nullable = true)
|-- avg_result: string (nullable = true)
|-- score: string (nullable = true)
|-- active: boolean (nullable = true)

>>> df.show()
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|      Y|      0.05|   10| false|
|  3| Ian|      N|      0.01|    1| false|
|  4| Jim|      N|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

Применение преобразования с приведением в пункте иначе .otherwise(f.col(col).cast("string"))

cols = ["testing", "active"]

for col in cols:
    df = df.withColumn(col, 
      f.when(f.col(col) == 'N', 'False')
      .when(f.col(col) == 'Y', 'True')
      .when(f.col(col).cast("string") == 'true', 'True')
      .when(f.col(col).cast("string") == 'false', 'False'))

Результаты

>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- testing: string (nullable = true)
|-- avg_result: string (nullable = true)
|-- score: string (nullable = true)
|-- active: string (nullable = true)

>>> df.show()
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  True|
|  2| Ram|   True|      0.05|   10| False|
|  3| Ian|  False|      0.01|    1| False|
|  4| Jim|  False|       1.2|    3|  True|
+---+----+-------+----------+-----+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...