Используйте rlike с регулярным выражением столбца в искре 1.5.1 - PullRequest
0 голосов
/ 17 марта 2020

Я хочу отфильтровать фрейм данных на основе применения значений регулярных выражений в одном из столбцов к другому столбцу.

Example:
Id Column1 RegexColumm
1  Abc     A.*
2  Def     B.*
3  Ghi     G.*

Результат фильтрации фрейма данных с использованием RegexColumm должен давать строки с идентификаторами 1 и 3.

Есть ли способ сделать это в spark 1.5.1? Не хочу использовать UDF, так как это может вызвать проблемы с масштабируемостью в поисках искробезопасного API.

Ответы [ 2 ]

0 голосов
/ 18 марта 2020
scala> val df = Seq((1,"Abc","A.*"),(2,"Def","B.*"),(3,"Ghi","G.*")).toDF("id","Column1","RegexColumm")
df: org.apache.spark.sql.DataFrame = [id: int, Column1: string ... 1 more field]

scala> val requiredDF = df.filter(x=> x.getAs[String]("Column1").matches(x.getAs[String]("RegexColumm")))
requiredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, Column1: string ... 1 more field]

scala> requiredDF.show
+---+-------+-----------+
| id|Column1|RegexColumm|
+---+-------+-----------+
|  1|    Abc|        A.*|
|  3|    Ghi|        G.*|
+---+-------+-----------+

Вы можете использовать как выше, я думаю, это то, что вы любите. Пожалуйста, дайте мне знать, если это поможет вам.

0 голосов
/ 17 марта 2020

Вы можете преобразовать df -> rdd, затем, пройдя через строку, мы можем сопоставить regex и , отфильтровать только соответствующие данные без использования UDF.

Example:

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

 df.show()
//+---+-------+--------+
//| id|column1|regexCol|
//+---+-------+--------+
//|  1|    Abc|     A.*|
//|  2|    Def|     B.*|
//|  3|    Ghi|     G.*|
//+---+-------+--------+

//creating new schema to add new boolean field
val sch = StructType(df.schema.fields ++ Array(StructField("bool_col", BooleanType, false)))

//convert df to rdd and match the regex using .map
val rdd = df.rdd.map(row => {
  val regex = row.getAs[String]("regexCol")
  val bool = row.getAs[String]("column1").matches(regex)
  val bool_col = s"$bool".toBoolean
  val newRow = Row.fromSeq(row.toSeq ++ Array(bool_col))
  newRow
})

//convert rdd to dataframe filter out true values for bool_col
val final_df = sqlContext.createDataFrame(rdd, sch).where(col("bool_col")).drop("bool_col")
final_df.show(10)

//+---+-------+--------+
//| id|column1|regexCol|
//+---+-------+--------+
//|  1|    Abc|     A.*|
//|  3|    Ghi|     G.*|
//+---+-------+--------+

UPDATE:

Вместо .map мы можем использовать .mapPartition ( карта против mapPartiiton ):

val rdd = df.rdd.mapPartitions(
    partitions => {
      partitions.map(row => {
        val regex = row.getAs[String]("regexCol")
        val bool = row.getAs[String]("column1").matches(regex)
        val bool_col = s"$bool".toBoolean
        val newRow = Row.fromSeq(row.toSeq ++ Array(bool_col))
        newRow
      })
    })
...