Запустите пользовательское преобразование для строковых столбцов - PullRequest
0 голосов
/ 04 октября 2019

Предположим, у меня есть следующий фрейм данных:

var df = Seq(
  ("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
  ("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
  ("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")

Мне нужно выполнить преобразование для столбцов строки (в этом примере: Hex1, Hex2 и Bool) и преобразовать их в числовое значение, используя некоторыеПользовательская логика.

Фреймы данных генерируются чтением CSV-файлов, которые я не знаю схемы. Все, что я знаю, это то, что они содержат столбец Timestamp в качестве первого столбца, а затем переменное число столбцов, которые могут быть числовыми (целые или двойные / числа с плавающей запятой) или эти шестнадцатеричные и логические значения.

Я так думаюпреобразование должно было бы найти все строковые столбцы и для каждого из них запустить преобразование, которое добавит новый столбец в фрейм данных с числовым представлением строки. В этом случае шестнадцатеричные значения будут преобразованы в их десятичное представление. И строки «True», «False» будут преобразованы в 1 и 0. соответственно.

Возвращаясь к упрощенному примеру, я должен получить df, например:

|Timestamp  |Float|Integer|Hex1              |Hex2      |Bool |
|-----------|-----|-------|------------------|----------|-----|
|2019-09-01 |0.1  |1      |1                 |1         |1    |
|2019-09-02 |0.2  |2      |2                 |2         |0    |
|2019-09-03"|0.3  |3      |3                 |3         |1    |

Со всемичисловые (целые, плавающие или двойные) столбцы, за исключением отметки времени

Ответы [ 2 ]

2 голосов
/ 04 октября 2019

В соответствии с вашим примером используйте следующую функцию:

Используйте стандартную функцию conv, чтобы преобразовать шестнадцатеричный код в соответствующий тип. conv (num: Column, fromBase: Int, toBase: Int) : Column Преобразовать число в строковом столбце из одной базы в другую.

when (Условие столбца, Objectзначение): Оценивает список условий и возвращает одно из нескольких возможных выражений результата.

import org.apache.spark.sql.functions.conv
import org.apache.spark.sql.functions._

val s1 = df.
      withColumn("Hex1", conv(col("Hex1").substr(lit(3), length(col("Hex1"))), 16, 10) cast IntegerType).
      withColumn("Hex2", conv(col("Hex2").substr(lit(3), length(col("Hex2"))), 16, 10) cast IntegerType).
      withColumn("Bool", when(col("Bool") === "True", 1)
        .otherwise(0))

s1.show()
s1.printSchema()

Из определения вашей задачи, т. е. динамически . Если вы хотите выполнить ту же задачу динамически, вам нужно проделать дополнительную работу.

  1. Создать отображение, т.е. столбец и его карту типа данных: это можно абстрагировать, вы можете создать свой файл отображения извне. Может генерироваться динамически, читая файл сопоставления.
val list = List(
          ("Hex", "Hex1"),
          ("Hex", "Hex2"),
          ("Bool", "Bool")
        )
создать конвертер, используя сопоставление с образцом :
object Helper {
    def convert(columnDetail: (String, String)): Column = {
      columnDetail._1 match {
        case "Hex" => conv(col(columnDetail._2).substr(lit(3), length(col(columnDetail._2))), 16, 10) cast IntegerType
        case "Bool" => when(col(columnDetail._2) === "True", 1).otherwise(0)
      // your other case 
      }
    }
  }

, вы можете добавить все случаи и их соответствующую реализацию.

окончательное решение
    import spark.implicits._
    var df = Seq(
      ("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
      ("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
      ("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
    ).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")


    val list = List(
      ("Hex", "Hex1"),
      ("Hex", "Hex2"),
      ("Bool", "Bool")
    )

    val temp = list.foldLeft(df) { (tempDF, listValue) =>
      tempDF.withColumn(listValue._2, Helper.convert(listValue))
    }


    temp.show(false)
    temp.printSchema()

  }

  object Helper {
    def convert(columnDetail: (String, String)): Column = {
      columnDetail._1 match {
        case "Hex" => conv(col(columnDetail._2).substr(lit(3), length(col(columnDetail._2))), 16, 10) cast IntegerType
        case "Bool" => when(col(columnDetail._2) === "True", 1).otherwise(0)
        // your other case 
      }
    }
  }

Результат:

+----------+-----+-------+----+----+----+
|Timestamp |Float|Integer|Hex1|Hex2|Bool|
+----------+-----+-------+----+----+----+
|2019-09-01|0.1  |1      |1   |1   |1   |
|2019-09-02|0.2  |2      |2   |2   |0   |
|2019-09-03|0.3  |3      |3   |3   |1   |
+----------+-----+-------+----+----+----+

root
 |-- Timestamp: string (nullable = true)
 |-- Float: double (nullable = false)
 |-- Integer: integer (nullable = false)
 |-- Hex1: integer (nullable = true)
 |-- Hex2: integer (nullable = true)
 |-- Bool: integer (nullable = false)
1 голос
/ 04 октября 2019

Ниже мой код искры, чтобы сделать это. Я использовал функцию конвектора spark sql http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.conv. Также, если вы хотите написать логику для динамической идентификации всех строковых столбцов во время выполнения и выполнения преобразования, это можно сделать, только если вы точно знаете, какой тип преобразования вы собираетесь делать.

var df = Seq(
  ("2019-09-01", 0.1, 1, "0x0000000000000001", "0x00000001", "True"),
  ("2019-09-02", 0.2, 2, "0x0000000000000002", "0x00000002", "False"),
  ("2019-09-03", 0.3, 3, "0x0000000000000003", "0x00000003", "True")
).toDF("Timestamp", "Float", "Integer", "Hex1", "Hex2", "Bool")

// df.show
df.createOrReplaceTempView("sourceTable")
val finalDF = spark.sql("""
select  Timestamp,
        Float,
        Integer,
        conv(substr(Hex1,3),16,10) as Hex1,
        conv(substr(Hex2,3),16,10) as Hex2,
        case when Bool = "True" then 1 
             when Bool = "False" then 0 
             else NULL 
        end as Bool  
from sourceTable
""")
finalDF.show

Результат:

+----------+-----+-------+----+----+----+
| Timestamp|Float|Integer|Hex1|Hex2|Bool|
+----------+-----+-------+----+----+----+
|2019-09-01|  0.1|      1|   1|   1|   1|
|2019-09-02|  0.2|      2|   2|   2|   0|
|2019-09-03|  0.3|      3|   3|   3|   1|
+----------+-----+-------+----+----+----+
...