Scala + Как сделать замену заполнителя в столбце Spark Dataframe из файла? - PullRequest
1 голос
/ 05 апреля 2019

MyPlaceHolder.json

[[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"],[" PHPHONENUMBER ", "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)"],    
  [" PHPHONENUMBER ", "(^|\\W)(\\(0[\\d\\s]{1,}\\)[\\s|\\-|\\d{1,}]{1,})($|\\W)"]]

По сути, мне нужно прочитать этот файл и заменить шаблон в столбце DF заполнителем.

Для примера: любой шаблон, подобный этому "(^|\\W)(\\+\\d{1,}\\s*\\(?\\d{1,}\\)?[\\s|\\-|\\d{1,}]{1,})($|\\W)" shold get replace with " PHPHONENUMBER "

Я Python Я сделал что-то вроде этого, как показано ниже.

replacement_patterns = get_config_object__(os.getcwd() + REPLACEMENT_PATTERN_FILE_PATH)


def placeholder_replacement(text, replacement_patterns):
    """
     This function replace the place holder with reference to replacement_patterns.

     Parameters
     ----------
     text : String
         Input string to the function.

     replacement_patterns : json
         json object of placeholder replacement_patterns pattern.

     Returns
     -------
     text : String
         Output string with replacement of placeholder.
     """

    for replacement, pattern in replacement_patterns:
        text = re.compile(pattern, re.IGNORECASE | re.UNICODE).sub(replacement, text)
    return text

def get_config_object__(config_file_path):
    """
     This function mainly load the configuration object in json form.

     Parameters
     ----------
     config_file_path : str
         Configuration path.

     Returns
     -------
     config_object : JSON object
         Configuration object.
     """

    config_file = open(config_file_path)
    config_object = json.load(config_file)
    config_file.close()
    return config_object

Как я могу заменить этот тип замены файла в столбце фрейма данных?

Note:: I can not change file, its cross used a placeholder.json.(I know it's not json but can't help it)

Its inside resource folder.

Здесьэто то, что я пытаюсь, однако это просто эксперимент.Пожалуйста, не стесняйтесь предлагать что-то из коробки.ничего не получилось, я пробовал разные вещи, но, поскольку я плохо знаком с языком, мне нужна помощь.

    val inputPath = getClass.getResource("/input_data/placeholder_replacement.txt").getPath

    val inputDF = spark.read.option("delimiter", "|").option("header", true).option("ignoreLeadingWhiteSpace", true).option("ignoreTrailingWhiteSpace", true).csv(inputPath)


    val replacement_pattern = getClass.getResource("/unitmetrics-replacement-patterns.json").getPath

    val replacement_pattern_DF = (spark.read.text(replacement_pattern))


    val myval = replacement_pattern_DF.rdd.map(row => row.getString(0).split("],").toList).collect()


    val removeNonGermanLetterFunction = udf((col: String) => {



      myval.foreach { x =>

        x.foreach { x =>

          var key = x.split("\",")(0).replaceAll("[^0-9a-zA-ZäöüßÄÖÜẞ _]", "")
          var value = x.split("\",")(1).replaceAll("\"", "")

          val regex = value.r

          regex.replaceAllIn(col, key)


        }
      }
    }
    )


    val input = inputDF.withColumn("new", removeNonGermanLetterFunction(col("duplicate_word_col")))

    input.show()

1 Ответ

1 голос
/ 05 апреля 2019

Вам следует использовать Spark DataFrame (также называемый Spark SQL) API , когда это возможно, вместо API RDD более низкого уровня, который вы показали (rdd.map(), rdd.foreach() ...).

Как правило, это означает загрузку данных в DataFrame df, а затем использование df.withColumn() для создания новых столбцов с преобразованием, примененным к предыдущим столбцам.RDD все еще используются ниже в конце, но многие вещи оптимизированы для вас с помощью высокоуровневого API DataFrame.

Вот небольшое приложение Scala, показывающее, как применять замены шаблонов к DataFrame с помощью функции Spark SQLregexp_replace.

import org.apache.log4j.{Logger, Level}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Column

object Main {

  def main(args: Array[String]): Unit = {

    // Set logging level to avoid Spark log spam
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    // Build Spark SQL session (mine is version 2.3.2)
    val spark = SparkSession.builder
      .appName("scalaTest1")
      .master("local[*]")
      .getOrCreate()

    // Import required to use Spark SQL methods like toDF() and calling columns with '
    import spark.implicits._

    // Create some basic DataFrame
    val df1 = List(
      (1, "I got pattern1 and pattern2."),
      (2, "I don't have any."),
      (3, "Oh, that pattern1 I have too.")
    ).toDF("id", "sentence")

    df1.show(false)
    //+---+-----------------------------+
    //|id |sentence                     |
    //+---+-----------------------------+
    //|1  |I got pattern1 and pattern2. |
    //|2  |I don't have any.            |
    //|3  |Oh, that pattern1 I have too.|
    //+---+-----------------------------+

    // Create replacements map
    val replacements = Map(
      "pattern1" -> "replacement1",
      "pattern2" -> "replacement2",
      "I " -> "you "
    )

    // Import required to use functions on DataFrame columns such as regexp_replace()
    import org.apache.spark.sql.functions._

    // Create a new column with one of the replacements applied to "sentence" column
    val df2 = df1.withColumn(
      "new",
      regexp_replace('sentence, "pattern1", replacements("pattern1"))
    )

    df2.show(false)
    //+---+-----------------------------+---------------------------------+
    //|id |sentence                     |new                              |
    //+---+-----------------------------+---------------------------------+
    //|1  |I got pattern1 and pattern2. |I got replacement1 and pattern2. |
    //|2  |I don't have any.            |I don't have any.                |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 I have too.|
    //+---+-----------------------------+---------------------------------+

    // With the first two replacements applied to "sentence" column by nesting one inside the other
    val df3 = df1.withColumn(
      "new",
      regexp_replace(
        regexp_replace('sentence, "pattern2", replacements("pattern2")),
        "pattern1",
        replacements("pattern1")
      )
    )

    df3.show(false)
    //+---+-----------------------------+------------------------------------+
    //|id |sentence                     |new                                 |
    //+---+-----------------------------+------------------------------------+
    //|1  |I got pattern1 and pattern2. |I got replacement1 and replacement2.|
    //|2  |I don't have any.            |I don't have any.                   |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 I have too.   |
    //+---+-----------------------------+------------------------------------+

    // Same, but applying all replacements recursively with "foldLeft" instead of nesting every replacement
    val df4 = df1.withColumn(
      "new",
      replacements.foldLeft(df1("sentence")) {
        case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
      }
    )
    df4.show(false)
    //+---+-----------------------------+--------------------------------------+
    //|id |sentence                     |new                                   |
    //+---+-----------------------------+--------------------------------------+
    //|1  |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
    //|2  |I don't have any.            |you don't have any.                   |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 you have too.   |
    //+---+-----------------------------+--------------------------------------+

    // Select the columns you want to keep and rename if necessary
    val df5 = df4.select('id, 'new).withColumnRenamed("new", "sentence")
    df5.show(false)
    //+---+--------------------------------------+
    //|id |sentence                              |
    //+---+--------------------------------------+
    //|1  |you got replacement1 and replacement2.|
    //|2  |you don't have any.                   |
    //|3  |Oh, that replacement1 you have too.   |
    //+---+--------------------------------------+

  }

}

Существуют различные библиотеки для чтения из JSON в Scala, здесь я собираюсь использовать метод Spark SQL spark.read.json(path), чтобы не добавлять другую зависимость,даже если использование Spark для чтения такого маленького файла может показаться излишним.

Обратите внимание, что функция, которую я использую, ожидает определенный формат файла, состоящий из одного допустимого объекта JSON на строку, и вы должны иметь возможность отобразитьполя JSON в столбцы данных.

Это содержимое файла replacements.json, который я создал:

{"pattern":"pattern1" , "replacement": "replacement1"}
{"pattern":"pattern2" , "replacement": "replacement2"}
{"pattern":"I " , "replacement": "you "}

А вот небольшое приложение, переписанное для чтения замен этогофайл, поместите их в карту, а затем примените их к данным с помощью метода foldLeft, который я показал в конце предыдущего.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Column, SparkSession}

object Main2 {

  def main(args: Array[String]): Unit = {

    // Set logging level to avoid Spark log spam
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    // Build Spark SQL session (mine is version 2.3.2)
    val spark = SparkSession.builder
      .appName("scalaTest1")
      .master("local[*]")
      .getOrCreate()

    // Import required to use Spark SQL methods like toDF() and calling columns with '
    import spark.implicits._
    // Import required to use functions on DataFrame columns such as regexp_replace()
    import org.apache.spark.sql.functions._


    // Create some basic DataFrame
    val df1 = List(
      (1, "I got pattern1 and pattern2."),
      (2, "I don't have any."),
      (3, "Oh, that pattern1 I have too.")
    ).toDF("id", "sentence")
    df1.show(false)
    //+---+-----------------------------+
    //|id |sentence                     |
    //+---+-----------------------------+
    //|1  |I got pattern1 and pattern2. |
    //|2  |I don't have any.            |
    //|3  |Oh, that pattern1 I have too.|
    //+---+-----------------------------+

    // Read replacements json file into a DataFrame
    val replacements_path = "/path/to/your/replacements.json"
    val replacements_df = spark.read.json(replacements_path)
    replacements_df.show(false)
    //+--------+------------+
    //|pattern |replacement |
    //+--------+------------+
    //|pattern1|replacement1|
    //|pattern2|replacement2|
    //|I       |you         |
    //+--------+------------+

    // Turn DataFrame into a Map for ease of use in next step
    val replacements_map = replacements_df
      .collect() // Brings all the df data from all Spark executors to the Spark driver, use only if df is small!
      .map(row => (row.getAs[String]("pattern"), row.getAs[String]("replacement")))
      .toMap
    print(replacements_map)
    // Map(pattern1 -> replacement1, pattern2 -> replacement2, I  -> you )

    // Apply replacements recursively with "foldLeft"
    val df2 = df1.withColumn(
      "new",
      replacements_map.foldLeft(df1("sentence")) {
        case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
      }
    )
    df2.show(false)
    //+---+-----------------------------+--------------------------------------+
    //|id |sentence                     |new                                   |
    //+---+-----------------------------+--------------------------------------+
    //|1  |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
    //|2  |I don't have any.            |you don't have any.                   |
    //|3  |Oh, that pattern1 I have too.|Oh, that replacement1 you have too.   |
    //+---+-----------------------------+--------------------------------------+

    // Select the columns you want to keep and rename if necessary
    val df3 = df2.select('id, 'new).withColumnRenamed("new", "sentence")
    df3.show(false)
    //+---+--------------------------------------+
    //|id |sentence                              |
    //+---+--------------------------------------+
    //|1  |you got replacement1 and replacement2.|
    //|2  |you don't have any.                   |
    //|3  |Oh, that replacement1 you have too.   |
    //+---+--------------------------------------+

  }

}

В вашем последнемприложение, удалите df.show() и print().Искровые «трансформации» являются «ленивыми».Это означает, что Spark просто сложит то, что вы просите сделать, в граф выполнения (DAG) без выполнения.Только когда вы заставите его действовать, например, когда вы используете df.show() или df.save() для записи данных куда-либо (это называется «действиями»), он будет анализировать DAG, оптимизировать его и фактически выполнять преобразованияна данных.Вот почему вы должны избегать использования таких действий, как df.show() для промежуточных преобразований.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...