Вам следует использовать Spark DataFrame (также называемый Spark SQL) API , когда это возможно, вместо API RDD более низкого уровня, который вы показали (rdd.map()
, rdd.foreach()
...).
Как правило, это означает загрузку данных в DataFrame df
, а затем использование df.withColumn()
для создания новых столбцов с преобразованием, примененным к предыдущим столбцам.RDD все еще используются ниже в конце, но многие вещи оптимизированы для вас с помощью высокоуровневого API DataFrame.
Вот небольшое приложение Scala, показывающее, как применять замены шаблонов к DataFrame с помощью функции Spark SQLregexp_replace
.
import org.apache.log4j.{Logger, Level}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Column
object Main {
def main(args: Array[String]): Unit = {
// Set logging level to avoid Spark log spam
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
// Build Spark SQL session (mine is version 2.3.2)
val spark = SparkSession.builder
.appName("scalaTest1")
.master("local[*]")
.getOrCreate()
// Import required to use Spark SQL methods like toDF() and calling columns with '
import spark.implicits._
// Create some basic DataFrame
val df1 = List(
(1, "I got pattern1 and pattern2."),
(2, "I don't have any."),
(3, "Oh, that pattern1 I have too.")
).toDF("id", "sentence")
df1.show(false)
//+---+-----------------------------+
//|id |sentence |
//+---+-----------------------------+
//|1 |I got pattern1 and pattern2. |
//|2 |I don't have any. |
//|3 |Oh, that pattern1 I have too.|
//+---+-----------------------------+
// Create replacements map
val replacements = Map(
"pattern1" -> "replacement1",
"pattern2" -> "replacement2",
"I " -> "you "
)
// Import required to use functions on DataFrame columns such as regexp_replace()
import org.apache.spark.sql.functions._
// Create a new column with one of the replacements applied to "sentence" column
val df2 = df1.withColumn(
"new",
regexp_replace('sentence, "pattern1", replacements("pattern1"))
)
df2.show(false)
//+---+-----------------------------+---------------------------------+
//|id |sentence |new |
//+---+-----------------------------+---------------------------------+
//|1 |I got pattern1 and pattern2. |I got replacement1 and pattern2. |
//|2 |I don't have any. |I don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 I have too.|
//+---+-----------------------------+---------------------------------+
// With the first two replacements applied to "sentence" column by nesting one inside the other
val df3 = df1.withColumn(
"new",
regexp_replace(
regexp_replace('sentence, "pattern2", replacements("pattern2")),
"pattern1",
replacements("pattern1")
)
)
df3.show(false)
//+---+-----------------------------+------------------------------------+
//|id |sentence |new |
//+---+-----------------------------+------------------------------------+
//|1 |I got pattern1 and pattern2. |I got replacement1 and replacement2.|
//|2 |I don't have any. |I don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 I have too. |
//+---+-----------------------------+------------------------------------+
// Same, but applying all replacements recursively with "foldLeft" instead of nesting every replacement
val df4 = df1.withColumn(
"new",
replacements.foldLeft(df1("sentence")) {
case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
}
)
df4.show(false)
//+---+-----------------------------+--------------------------------------+
//|id |sentence |new |
//+---+-----------------------------+--------------------------------------+
//|1 |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
//|2 |I don't have any. |you don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 you have too. |
//+---+-----------------------------+--------------------------------------+
// Select the columns you want to keep and rename if necessary
val df5 = df4.select('id, 'new).withColumnRenamed("new", "sentence")
df5.show(false)
//+---+--------------------------------------+
//|id |sentence |
//+---+--------------------------------------+
//|1 |you got replacement1 and replacement2.|
//|2 |you don't have any. |
//|3 |Oh, that replacement1 you have too. |
//+---+--------------------------------------+
}
}
Существуют различные библиотеки для чтения из JSON в Scala, здесь я собираюсь использовать метод Spark SQL spark.read.json(path)
, чтобы не добавлять другую зависимость,даже если использование Spark для чтения такого маленького файла может показаться излишним.
Обратите внимание, что функция, которую я использую, ожидает определенный формат файла, состоящий из одного допустимого объекта JSON на строку, и вы должны иметь возможность отобразитьполя JSON в столбцы данных.
Это содержимое файла replacements.json
, который я создал:
{"pattern":"pattern1" , "replacement": "replacement1"}
{"pattern":"pattern2" , "replacement": "replacement2"}
{"pattern":"I " , "replacement": "you "}
А вот небольшое приложение, переписанное для чтения замен этогофайл, поместите их в карту, а затем примените их к данным с помощью метода foldLeft, который я показал в конце предыдущего.
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Column, SparkSession}
object Main2 {
def main(args: Array[String]): Unit = {
// Set logging level to avoid Spark log spam
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
// Build Spark SQL session (mine is version 2.3.2)
val spark = SparkSession.builder
.appName("scalaTest1")
.master("local[*]")
.getOrCreate()
// Import required to use Spark SQL methods like toDF() and calling columns with '
import spark.implicits._
// Import required to use functions on DataFrame columns such as regexp_replace()
import org.apache.spark.sql.functions._
// Create some basic DataFrame
val df1 = List(
(1, "I got pattern1 and pattern2."),
(2, "I don't have any."),
(3, "Oh, that pattern1 I have too.")
).toDF("id", "sentence")
df1.show(false)
//+---+-----------------------------+
//|id |sentence |
//+---+-----------------------------+
//|1 |I got pattern1 and pattern2. |
//|2 |I don't have any. |
//|3 |Oh, that pattern1 I have too.|
//+---+-----------------------------+
// Read replacements json file into a DataFrame
val replacements_path = "/path/to/your/replacements.json"
val replacements_df = spark.read.json(replacements_path)
replacements_df.show(false)
//+--------+------------+
//|pattern |replacement |
//+--------+------------+
//|pattern1|replacement1|
//|pattern2|replacement2|
//|I |you |
//+--------+------------+
// Turn DataFrame into a Map for ease of use in next step
val replacements_map = replacements_df
.collect() // Brings all the df data from all Spark executors to the Spark driver, use only if df is small!
.map(row => (row.getAs[String]("pattern"), row.getAs[String]("replacement")))
.toMap
print(replacements_map)
// Map(pattern1 -> replacement1, pattern2 -> replacement2, I -> you )
// Apply replacements recursively with "foldLeft"
val df2 = df1.withColumn(
"new",
replacements_map.foldLeft(df1("sentence")) {
case (c: Column, (pattern: String, replacement: String)) => regexp_replace(c, pattern, replacement)
}
)
df2.show(false)
//+---+-----------------------------+--------------------------------------+
//|id |sentence |new |
//+---+-----------------------------+--------------------------------------+
//|1 |I got pattern1 and pattern2. |you got replacement1 and replacement2.|
//|2 |I don't have any. |you don't have any. |
//|3 |Oh, that pattern1 I have too.|Oh, that replacement1 you have too. |
//+---+-----------------------------+--------------------------------------+
// Select the columns you want to keep and rename if necessary
val df3 = df2.select('id, 'new).withColumnRenamed("new", "sentence")
df3.show(false)
//+---+--------------------------------------+
//|id |sentence |
//+---+--------------------------------------+
//|1 |you got replacement1 and replacement2.|
//|2 |you don't have any. |
//|3 |Oh, that replacement1 you have too. |
//+---+--------------------------------------+
}
}
В вашем последнемприложение, удалите df.show()
и print()
.Искровые «трансформации» являются «ленивыми».Это означает, что Spark просто сложит то, что вы просите сделать, в граф выполнения (DAG) без выполнения.Только когда вы заставите его действовать, например, когда вы используете df.show()
или df.save()
для записи данных куда-либо (это называется «действиями»), он будет анализировать DAG, оптимизировать его и фактически выполнять преобразованияна данных.Вот почему вы должны избегать использования таких действий, как df.show()
для промежуточных преобразований.