Тестирование служебной функции путем написания модульного теста в apache spark scala - PullRequest
2 голосов
/ 24 апреля 2019

У меня есть служебная функция, написанная на scala для чтения паркетных файлов из корзины s3. Может ли кто-нибудь помочь мне в написании модульных тестов для этого

Ниже приведена функция, которую необходимо проверить.

  def readParquetFile(spark: SparkSession,
                      locationPath: String): DataFrame = {
    spark.read
      .parquet(locationPath)
  }

Пока что я создал SparkSession, для которого мастер является локальным

import org.apache.spark.sql.SparkSession


trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("Test App").getOrCreate()
  }

}

Я застрял с тестированием функции. Вот код, где я застрял. Вопрос заключается в том, должен ли я создать настоящий файл паркета и загрузить его, чтобы увидеть, создается ли фрейм данных или есть фальшивый фреймворк для проверки этого.

import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.scalatest.FunSpec

class ReadAndWriteSpec extends FunSpec with DataFrameComparer with SparkSessionTestWrapper {

  import spark.implicits._

  it("reads a parquet file and creates a dataframe") {

  }

}

Edit:

Основываясь на комментариях к комментариям, я придумал ниже, но я все еще не могу понять, как это можно использовать.

Я использую https://github.com/findify/s3mock

class ReadAndWriteSpec extends FunSpec with DataFrameComparer with SparkSessionTestWrapper {

  import spark.implicits._

  it("reads a parquet file and creates a dataframe") {

    val api = S3Mock(port = 8001, dir = "/tmp/s3")
    api.start

    val endpoint = new EndpointConfiguration("http://localhost:8001", "us-west-2")
    val client = AmazonS3ClientBuilder
      .standard
      .withPathStyleAccessEnabled(true)
      .withEndpointConfiguration(endpoint)
      .withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()))
      .build

    /** Use it as usual. */
    client.createBucket("foo")
    client.putObject("foo", "bar", "baz")
    val url = client.getUrl("foo","bar")

    println(url.getFile())

    val df = ReadAndWrite.readParquetFile(spark,url.getPath())
    df.printSchema()

  }

}

1 Ответ

1 голос
/ 25 апреля 2019

Я разобрался и сохранил это просто. Я мог бы выполнить несколько базовых тестовых случаев.

Вот мое решение. Надеюсь, это кому-нибудь поможет.

import org.apache.spark.sql
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import loaders.ReadAndWrite

class ReadAndWriteTestSpec extends FunSuite with BeforeAndAfterEach{

  private val master = "local"

  private val appName = "ReadAndWrite-Test"

  var spark : SparkSession = _

  override def beforeEach(): Unit = {
    spark = new sql.SparkSession.Builder().appName(appName).master(master).getOrCreate()
  }

  test("creating data frame from parquet file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = spark.read.json("src/test/resources/people.json")
    peopleDF.write.mode(SaveMode.Overwrite).parquet("src/test/resources/people.parquet")

    val df = ReadAndWrite.readParquetFile(sparkSession,"src/test/resources/people.parquet")
    df.printSchema()

  }


  test("creating data frame from text file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.printSchema()
  }

  test("counts should match with number of records in a text file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.printSchema()

    assert(peopleDF.count() == 3)
  }

  test("data should match with sample records in a text file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.printSchema()

    assert(peopleDF.take(1)(0)(0).equals("Michael"))
  }

  test("Write a data frame as csv file") {
    val sparkSession = spark
    import sparkSession.implicits._
    val peopleDF = ReadAndWrite.readTextfileToDataSet(sparkSession,"src/test/resources/people.txt").map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()

    //header argument should be boolean to the user to avoid confusions
    ReadAndWrite.writeDataframeAsCSV(peopleDF,"src/test/resources/out.csv",java.time.Instant.now().toString,",","true")
  }

  override def afterEach(): Unit = {
    spark.stop()
  }

}

case class Person(name: String, age: Int)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...