Если вы хотите проверить sparkSession.read.jdbc(...)
, вы можете играть с базой данных H2 в памяти.Я делаю это иногда, когда пишу учебные тесты.Вы можете найти пример здесь: https://github.com/bartosz25/spark-scala-playground/blob/d3cad26ff236ae78884bdeb300f2e59a616dc479/src/test/scala/com/waitingforcode/sql/LoadingDataTest.scala Обратите внимание, однако, что вы можете столкнуться с некоторыми тонкими различиями с "настоящими" СУБД.
С другой стороны, вы можете лучше разделить проблемы кода исоздайте DataFrame
по-другому, например, с помощью метода toDF(...)
.Вы можете найти пример здесь: https://github.com/bartosz25/spark-scala-playground/blob/77ea416d2493324ddd6f3f2be42122855596d238/src/test/scala/com/waitingforcode/sql/CorrelatedSubqueryTest.scala
И наконец, ИМО, если вам нужно высмеивать DataFrameReader
, это означает, что, возможно, что-то связано с разделением кода.Например, вы можете поместить все свои фильтры в объект Filters
и протестировать каждый фильтр отдельно.То же самое для функций отображения или агрегирования.2 года назад я написал сообщение в блоге о тестировании Apache Spark - https://www.waitingforcode.com/apache-spark/testing-spark-applications/read В нем описывается API RDD, но идея разделения проблем одинакова.
Обновлено:
object Filters {
def isInFileTypes(inputDataFrame: DataFrame, fileTypes: Seq[String]): DataFrame = {
inputDataFrame.where(col("column").isin(fileTypes: _*))
}
}
object ConfigurationLoader {
def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
val input = spark.read
.format("jdbc")
.option("url", s"$postgresUrl/$postgresDatabase")
.option("dbtable", tableName)
.option("user", postgresUsername)
.option("password", postgresPassword)
.option("driver", postgresDriver)
.load()
Filters.isInFileTypes(input, Seq("txt", "doc")
}
И с этим вы можете проверять свою логику фильтрации как хотите :) Если у вас есть больше фильтров и вы хотите их протестировать, вы также можете объединить их одним способом, передать любой DataFrame
, который вы хотите, и вуаля :) Выне следует проверять .load()
, если у вас нет для этого веских причин.Это внутренняя логика Apache Spark, уже протестированная.
Обновление, ответьте за:
Итак, теперь я могу тестировать фильтры, но как убедиться, что readTable действительноиспользуйте правильный фильтр (извините за тщательность, это просто вопрос полного охвата).Вероятно, у вас есть простой подход к макетированию объекта scala (на самом деле это вторая проблема).- dytyniak 14 минут назад
object MyApp {
def main(args: Array[String]): Unit = {
val inputDataFrame = readTable(postgreSQLConnection)
val outputDataFrame = ProcessingLogic.generateOutputDataFrame(inputDataFrame)
}
}
object ProcessingLogic {
def generateOutputDataFrame(inputDataFrame: DataFrame): DataFrame = {
// Here you apply all needed filters, transformations & co
}
}
Как видите, не надо издеваться над object
здесь.Это кажется избыточным, но это не потому, что вы можете протестировать каждый фильтр изолированно благодаря объекту Filters
и всей вашей логике обработки, объединенной благодаря объекту ProcessingLogic
(например, имя).И вы можете создать свой DataFrame
любым допустимым способом.Недостатком является то, что вам нужно явно определить схему или использовать case classes
, поскольку в вашем исходном коде PostgreSQL Apache Spark автоматически разрешит схему (я объяснил это здесь: https://www.waitingforcode.com/apache-spark-sql/schema-projection/read).