Как издеваться над Spark DataFrameReader, используя scala? - PullRequest
2 голосов
/ 03 апреля 2019

Я хочу выполнить модульный тестовый код, который читает DataFrame из RDBMS, используя sparkSession.read.jdbc(...). Но я не нашел способа заставить DataFrameReader вернуть фиктивный DataFrame для теста.

Пример кода:

object ConfigurationLoader {

def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
    spark.read
      .format("jdbc")
      .option("url", s"$postgresUrl/$postgresDatabase")
      .option("dbtable", tableName)
      .option("user", postgresUsername)
      .option("password", postgresPassword)
      .option("driver", postgresDriver)
      .load()
  }

def loadUsingFilter(dummyFilter: String*)(implicit spark: SparkSession): DataFrame = {
    readTable(postgresFilesTableName)
      .where(col("column").isin(fileTypes: _*))
  }
}

И вторая проблема - чтобы смоделировать scala-объект, похоже, мне нужно использовать другой подход для создания такого сервиса.

Ответы [ 2 ]

4 голосов
/ 03 апреля 2019

По моему мнению, модульные тесты не предназначены для тестирования соединений с базой данных. Это должно быть сделано в интеграционных тестах, чтобы проверить, что все части работают вместе. Модульные тесты предназначены только для проверки вашей функциональной логики, а не способности Spark читать из базы данных.

Вот почему я разработал бы ваш код немного иначе и сделал бы это, не заботясь о БД.

/** This, I don't test. I trust spark.read */
def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
    spark.read
    .option(...)
    ...
    .load()
    // Nothing more
}

/** This I test, this is my logic. */
def transform(df : DataFrame, dummyFilter: String*): DataFrame = {
    df
      .where(col("column").isin(fileTypes: _*))
}

Тогда я использую код таким образом в производстве.

val source = readTable("...")
val result = transform(source, filter)

А теперь transform, который содержит мою логику, легко проверить. Если вам интересно, как создать фиктивные кадры данных, мне нравится один из следующих способов:

val df = Seq((1, Some("a"), true), (2, Some("b"), false), 
      (3, None, true)).toDF("x", "y", "z")
// and the test
val result = transform(df, filter)
result should be ...
2 голосов
/ 03 апреля 2019

Если вы хотите проверить sparkSession.read.jdbc(...), вы можете играть с базой данных H2 в памяти.Я делаю это иногда, когда пишу учебные тесты.Вы можете найти пример здесь: https://github.com/bartosz25/spark-scala-playground/blob/d3cad26ff236ae78884bdeb300f2e59a616dc479/src/test/scala/com/waitingforcode/sql/LoadingDataTest.scala Обратите внимание, однако, что вы можете столкнуться с некоторыми тонкими различиями с "настоящими" СУБД.

С другой стороны, вы можете лучше разделить проблемы кода исоздайте DataFrame по-другому, например, с помощью метода toDF(...).Вы можете найти пример здесь: https://github.com/bartosz25/spark-scala-playground/blob/77ea416d2493324ddd6f3f2be42122855596d238/src/test/scala/com/waitingforcode/sql/CorrelatedSubqueryTest.scala

И наконец, ИМО, если вам нужно высмеивать DataFrameReader, это означает, что, возможно, что-то связано с разделением кода.Например, вы можете поместить все свои фильтры в объект Filters и протестировать каждый фильтр отдельно.То же самое для функций отображения или агрегирования.2 года назад я написал сообщение в блоге о тестировании Apache Spark - https://www.waitingforcode.com/apache-spark/testing-spark-applications/read В нем описывается API RDD, но идея разделения проблем одинакова.


Обновлено:

object Filters {
  def isInFileTypes(inputDataFrame: DataFrame, fileTypes: Seq[String]): DataFrame = {
    inputDataFrame.where(col("column").isin(fileTypes: _*))
  }
}

object ConfigurationLoader {

def readTable(tableName: String)(implicit spark: SparkSession): DataFrame = {
    val input = spark.read
      .format("jdbc")
      .option("url", s"$postgresUrl/$postgresDatabase")
      .option("dbtable", tableName)
      .option("user", postgresUsername)
      .option("password", postgresPassword)
      .option("driver", postgresDriver)
      .load()
    Filters.isInFileTypes(input, Seq("txt", "doc")
  }

И с этим вы можете проверять свою логику фильтрации как хотите :) Если у вас есть больше фильтров и вы хотите их протестировать, вы также можете объединить их одним способом, передать любой DataFrame, который вы хотите, и вуаля :) Выне следует проверять .load(), если у вас нет для этого веских причин.Это внутренняя логика Apache Spark, уже протестированная.


Обновление, ответьте за:

Итак, теперь я могу тестировать фильтры, но как убедиться, что readTable действительноиспользуйте правильный фильтр (извините за тщательность, это просто вопрос полного охвата).Вероятно, у вас есть простой подход к макетированию объекта scala (на самом деле это вторая проблема).- dytyniak 14 минут назад

object MyApp {
  def main(args: Array[String]): Unit = {
    val inputDataFrame = readTable(postgreSQLConnection)
    val outputDataFrame = ProcessingLogic.generateOutputDataFrame(inputDataFrame)  
  }
}

object ProcessingLogic {
  def generateOutputDataFrame(inputDataFrame: DataFrame): DataFrame = {
    // Here you apply all needed filters, transformations & co
  }
}

Как видите, не надо издеваться над object здесь.Это кажется избыточным, но это не потому, что вы можете протестировать каждый фильтр изолированно благодаря объекту Filters и всей вашей логике обработки, объединенной благодаря объекту ProcessingLogic (например, имя).И вы можете создать свой DataFrame любым допустимым способом.Недостатком является то, что вам нужно явно определить схему или использовать case classes, поскольку в вашем исходном коде PostgreSQL Apache Spark автоматически разрешит схему (я объяснил это здесь: https://www.waitingforcode.com/apache-spark-sql/schema-projection/read).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...