Пожалуйста, найдите простой пример для тестирования блока данных.Вы можете разделить его на две части.Первый.чтобы проверить преобразование, и вы можете сделать простой сценарий оболочки, чтобы проверить записанный файл
import com.holdenkarau.spark.testing._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.{FunSuite, Matchers}
class SomeDFTest extends FunSuite with Matchers with DataFrameSuiteBase {
import spark.implicits._
test("Testing Input customer data date transformation") {
val inputSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false)
)
val expectedSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false),
StructField("dummyColumn", StringType, false)
)
val inputData = Seq(
Row(8, "bat"),
Row(64, "mouse"),
Row(-27, "horse")
)
val expectedData = Seq(
Row (8, "bat","test"),
Row(64, "mouse","test"),
Row(-27, "horse","test")
)
val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(inputData),
StructType(inputSchema)
)
val expectedDF = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
)
val actual = transformSomeDf(inputDF)
assertDataFrameEquals(actual, expectedDF) // equal
}
def transformSomeDf(df:DataFrame):DataFrame={
df.withColumn("dummyColumn",lit("test"))
}
}
Sbt.build config
name := "SparkTest"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.holdenkarau" %% "spark-testing-base" % "2.4.0_0.11.0" % Test
)