Модульное тестирование цепочки преобразования искровых данных - PullRequest
0 голосов
/ 28 января 2019

Я очень новичок в scala spark экосистеме, и мне интересно, как лучше всего провести модульное тестирование преобразования цепочки данных. Вот пример кода метода, который я хотел бы протестировать

def writeToParquet(spark: SparkSession, dataFrame: DataFrame, col1: DataType1, col2:DataType2): Unit {
    dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic
        .write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")
        .parquet("some hdfs/s3/url")        
} 

Проблема в том, что паркет имеет тип возврата Unit, что затрудняет тестирование.Эта проблема еще более усугубляется тем фактом, что преобразования по своей природе являются неизменными, что делает насмешку и шпионаж немного сложным

Чтобы создать фрейм данных, я сбросил набор тестовых данных в csv

Ответы [ 2 ]

0 голосов
/ 12 февраля 2019

Первое, что я понял при тестировании фрейма данных, - это разделение Transformation и IOs

Таким образом, для вышеупомянутых сценариев мы можем разделить вышеуказанную цепочку на три части

class Coordinator {
    def transformAndWrite(dataframe: Dataframe): Unit = {
transformedDf = dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic
partitionedDfWriter = transformedDf.write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")

и

partitionedDfWriter.parquet("some hdfs/s3/url")
}

теперь мы можем переместить их в три отдельных класса,

DFTransformer, DFPartitioner и DataFrameParquetWriter extends ResourceWriter

Итак, кодможет стать примерно таким

class DFTransformer {
    def transform(dataframe:DataFrame): Dataframe = {
        return dataFrame
        .withColumn("date", some_columnar_date_logic)
        .withColumn("hour", some_more_functional_logic)
        .... //couple more transformation logic

}
class DfPartitioner {
    def partition(dataframe: DataFrame): DataFrameWriter = {
        return dataframe.write
        .mode(SaveMode.Append)
        .partitionBy("col1", "col2", "col3")
    }
}

и

class DataFrameParquetWriter extends ResourceWriter {
    overide def write(partitionedDfWriter: DataFrameWriter) = {
       partitionedDfWriter.parquet("some hdfs/s3/url") 

    }

class Coordinator(dfTransformer:DfTransformer, dfPartitioner: DFPartitioner, resourceWriter: ResourceWriter) {
    val transformedDf = dfTransformer.transform(dataframe)
    val partitionedDfWriter = dfPartitioner.partition(transformedDf)
    resourceWriter.write(partitionedDfWriter)
}
  • Преимущество вышеизложенного состоит в том, что когда вам приходится тестировать свой класс координатора, выможно очень легко использовать Mockito для проверки ваших зависимостей.

  • Тестирование DFTransformer теперь также просто, вы можете передать заглушенный Dataframe и утвердить возвращенный DataFrame. (с помощью spark-тестирования-база).Мы также можем проверить столбцы, возвращенные Преобразованием.Мы также можем проверить количество

0 голосов
/ 28 января 2019

Пожалуйста, найдите простой пример для тестирования блока данных.Вы можете разделить его на две части.Первый.чтобы проверить преобразование, и вы можете сделать простой сценарий оболочки, чтобы проверить записанный файл

import com.holdenkarau.spark.testing._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.{FunSuite, Matchers}

class SomeDFTest extends FunSuite with Matchers with DataFrameSuiteBase    {
 import spark.implicits._

  test("Testing Input customer data date transformation") {


    val inputSchema = List(
      StructField("number", IntegerType, false),
      StructField("word", StringType, false)
    )
    val expectedSchema = List(
      StructField("number", IntegerType, false),
      StructField("word", StringType, false),
      StructField("dummyColumn", StringType, false)

    )
    val inputData = Seq(
      Row(8, "bat"),
      Row(64, "mouse"),
      Row(-27, "horse")
    )

    val expectedData = Seq(
      Row (8, "bat","test"),
      Row(64, "mouse","test"),
      Row(-27, "horse","test")
    )

    val inputDF = spark.createDataFrame(
      spark.sparkContext.parallelize(inputData),
      StructType(inputSchema)
    )

    val expectedDF = spark.createDataFrame(
      spark.sparkContext.parallelize(expectedData),
      StructType(expectedSchema)
    )


    val actual = transformSomeDf(inputDF)

    assertDataFrameEquals(actual, expectedDF) // equal



  }

  def transformSomeDf(df:DataFrame):DataFrame={
    df.withColumn("dummyColumn",lit("test"))
  }
}

Sbt.build config

name := "SparkTest"

version := "0.1"

scalaVersion := "2.11.8"

val sparkVersion = "2.3.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.holdenkarau" %% "spark-testing-base" % "2.4.0_0.11.0" % Test

)
...