Spark UDF в Scala для извлечения соответствующих данных - PullRequest
0 голосов
/ 05 сентября 2018

У меня есть Dataframe, у которого есть столбец, который нуждается в некоторой очистке. Я с нетерпением жду шаблона регулярного выражения, который можно применить в пользовательском интерфейсе Spark в Java / Scala, который будет извлекать допустимое содержимое из строки.

Пример строки ввода столбца userId, как показано в приведенном ниже кадре данных:

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

Ожидаемое преобразование столбца с именем "userId":

Строка, которая выглядит как:

105286112|115090439|29818926

Мне нужна логика / подход, чтобы изменить столбец userId, чтобы сделать UDF таким же. Может ли это случиться с регулярным выражением или каким-то другим подходом?

Входной DataFrame выглядит следующим образом:

+--------------------+--------------------+
|    dt_geo_cat_brand|        userId      |
+--------------------+--------------------+
|2017-10-30_17-18 ...|[[133207500,2017-...|
|2017-10-19_21-22 ...|[[194112773,2017-...|
|2017-10-29_17-18 ...|[[274188233,2017-...|
|2017-10-29_14-16 ...|[[86281353,2017-1...|
|2017-10-01_09-10 ...|[[92478766,2017-1...|
|2017-10-09_17-18 ...|[[156663365,2017-...|
|2017-10-06_17-18 ...|[[111869972,2017-...|
|2017-10-13_09-10 ...|[[64404465,2017-1...|
|2017-10-13_07-08 ...|[[146355663,2017-...|
|2017-10-22_21-22 ...|[[54096488,2017-1...|
+--------------------+--------------------+

Схема:

root
 |-- dt_geo_cat_brand: string (nullable = true)
 |-- userId: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

Желаемый выход:

+--------------------+--------------------+
|    dt_geo_cat_brand|         userId     |
+--------------------+--------------------+
|2017-10-30_17-18 ...|133207500,1993333444|
|2017-10-19_21-22 ...|122122212,3432323333|
|2017-10-29_17-18 ...|274188233,8869696966|
|2017-10-29_14-16 ...|862813534,444344444,43444343434|
|2017-10-01_09-10 ...|92478766,880342342,4243244432,5554335535|
+--------------------+--------------------+

и так далее ...

Ответы [ 2 ]

0 голосов
/ 05 сентября 2018

Вам не нужно регулярное выражение, чтобы решить эту проблему. Данные форматируются в виде массива структур, и, глядя на схему, вы хотите получить строку _1 для каждой структуры. Это можно решить с помощью UDF, которая извлекает значение и затем преобразует все в строку с mkString("|"), чтобы получить ожидаемый результат:

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0)).mkString("|")
})

df.withColumn("userId", extract_id($"userId"))

Добавление согласно комментарию № 1:

Если вы хотите сохранить результат, разбитый на dt_geo_cat_brand в CSV-файле (все значения в отдельной строке), вы можете сделать это следующим образом. Сначала верните список из udf вместо строки и используйте explode:

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0))
})

val df2 = df.withColumn("userId", explode(extract_id($"userId")))

Затем используйте partitionBy(dt_geo_cat_brand) при сохранении. Это создаст структуру папок в зависимости от значения в столбце dt_geo_cat_brand. В зависимости от разбиения число csv-файлов в каждой папке может отличаться, но все они будут иметь значения из одного значения в dt_geo_cat_brand (используйте repartition(1) перед сохранением, если вам нужен один файл и достаточно памяти).

df2.write.partitionBy("dt_geo_cat_brand").csv(baseOutputBucketPath)

Дополнительно согласно комментарию № 2:

Чтобы не использовать partitionBy при сохранении в виде отдельных файлов, вы можете сделать следующее (рекомендуется partitioBy appraoch). Сначала найдите все различные значения в dt_geo_cat_brand:

val vals = df.select("dt_geo_cat_brand").distinct().as[String].collect()

Для каждого из значений отфильтруйте фрейм данных и сохраните его (используйте разобранный df2 фрейм данных здесь как дополнение № 1):

vals.foreach { v =>
  df2.filter($"dt_geo_cat_brand" === v)
    .select("userId")
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}

В качестве альтернативы, не используйте разнесенный фрейм данных, а разбейте на "|", если используется этот udf:

vals.foreach { v =>
  df.filter($"dt_geo_cat_brand" === v)
    .select(split($"userId", "\\|").as("userId"))
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}
0 голосов
/ 05 сентября 2018

Напишите UDF, используя приведенное ниже регулярное выражение. Он извлечет то, что нужно.

import ss.implicits._

val df = ss.read.csv(path).as("")
df.show()

val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*" // regex which can extract the required data

val input = "[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]"   // input string
val mat = reg.r.findAllIn(input)  // extracting the data

println(mat)
while (mat.hasNext) {
    mat.next()
    println(mat.group(1) + "|" + mat.group(2)+ "|" +  mat.group(3)) // each group will print the 3 extracted fields
}

Выход:

105286112|115090439|29818926

С UDF:

import ss.implicits._

    val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*"

    def reg_func = { (s: String) =>
        {
            val mat = reg.r.findAllIn(s)

            println(mat)
            var out = ""
            while (mat.hasNext) {
                mat.next()
                out = mat.group(1) + "|" + mat.group(2) + "|" + mat.group(3)
            }
            out
        }
    }

    val reg_udf = udf(reg_func)

    val df = ss.read.text(path)
    .withColumn("Extracted_fields", reg_udf($"value"))
    df.show(false)

Ввод: создан некоторый образец 2-й записи

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]
[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

Выход:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|value                                                                                                                                                                                       |Extracted_fields            |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286112|115090439|29818926|
|[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286113|115090440|29818927|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...