Как мне вызвать функцию модульного теста в PySpark Code? - PullRequest
0 голосов
/ 01 апреля 2019

В моем коде PySpark есть модуль модульного тестирования, но я не уверен, как его выполнить.

Вот мой код, который просто читает один фрейм данных, который имеет только 2 столбца Day и Amount. Скрипт сохраняется как test.py Я просто делаю сумму в «День». Вот код -

import sys
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
import pytest
import unittest

def main():
    spark=SparkSession.builder.appName("MyApp").config("spark.sql.shuffle.partitions","2").getOrCreate()
    #Start ETL
    data = extract_data(spark)
    data_transformed = transform_data(data)

    # log the success and terminate Spark application
    spark.stop()
    return None

# Extract Data
def extract_data(spark):
    df = (spark.read.option("inferSchema", "true").option("header","true").csv("myfile.txt"))
    return df

# Transform Data
def transform_data(df):
    df_transformed = (df.groupBy("Day").sum("Amount").withColumnRenamed("sum(Amount)","total_amt").select("Day","total_amt"))
    return df_transformed

pytestmark = pytest.mark.usefixtures("spark")
def my_test_func(self):
    test_input = [Row(Day=1, Amount =10),\
                  Row(Day=1, Amount =20)]
    input_df = spark.createDataFrame(test_input)
    result = transform_data(input_df).select("total_amt").collect()[0]
    expected_result = 30
    self.assertEqual(result, expected_result)
    print("test done")

if __name__ == '__main__':
    main()

Я новичок в PySpark и у меня есть пара вопросов-

  1. Правильно ли объявлено мое модульное тестирование в приведенном выше коде? Если нет, то как это объявить?
  2. Как мне выполнить my_test_func? Где я должен это назвать? Я хочу запустить этот скрипт просто с помощью spark-submit

1 Ответ

2 голосов
/ 01 апреля 2019

Вы можете записать свой искровой тестовый модуль в другую папку.Например,

src
 +--jobs
      +-- job1

tests
 +--__jobs
      +---job1

Далее описывается, как написать тестовый пример


class TestJob1 \
            (unittest.TestCase):

    def setUp(self):
        """
        Start Spark, define config and path to test data
        """
        self.spark=SparkSession.builder
                               .appName("MyApp")
                               .config("spark.sql.shuffle.partitions","2")
                               .getOrCreate()
        self.job1 = Job1(self.spark)


    def tearDown(self):
        """
        Stop Spark
        """
        self.spark.stop()

    def test_yourtest_code(self):

        test_input = [Row(Day=1, Amount =10),
                      Row(Day=1, Amount =20)]
        input_df = spark.createDataFrame(test_input)
        result = transform_data(input_df).select("total_amt").collect()[0]
        expected_result = 30
        self.assertEqual(result, expected_result)
        print("test done")


Вы можете запустить тестовый пример

python -m unittest jobs.TestJob1
python -m unittest jobs.TestJob1.test_yourtest_code
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...