Я пишу агрегацию в pysaprk. В этот проект я также добавляю тест, в котором я создаю сеанс, помещаю некоторые данные, а затем запускаю агрегацию и проверяю результаты
Код выглядит следующим образом:
def mapper_convert_row(row):
#... specific of business logic code, eventually return one string value
return my_str
def run_spark_query(spark: SparkSession, from_dt, to_dt):
query = get_hive_query_str(from_dt, to_dt)
df = spark.sql(query).rdd.map(lambda row: Row(mapper_convert_row(row)))
out_schema = StructType([StructField("data", StringType())])
df_conv = spark.createDataFrame(df, out_schema)
df_conv.write.mode('overwrite').format("csv").save(folder)
А вот мой тестовый класс
class SparkFetchTest(unittest.TestCase):
@staticmethod
def getOrCreateSC():
conf = SparkConf()
conf.setMaster("local")
spark = (SparkSession.builder.config(conf=conf).appName("MyPySparkApp")
.enableHiveSupport().getOrCreate())
return spark
def test_fetch(self):
dt_from = datetime.strptime("2019-01-01-10-00", '%Y-%m-%d-%H-%M')
dt_to = datetime.strptime("2019-01-01-10-05", '%Y-%m-%d-%H-%M')
spark = self.getOrCreateSC()
self.init_and_populate_table_with_test_data(spark, input_tbl, dt_from, dt_to)
run_spark_query(spark, dt_from, dt_to)
# assert on results
Я добавил зависимости PySpark через среду Conda и запустил этот код через PyCharm. Просто чтобы прояснить: на моем локальном компьютере нет никакой искровой установки, кроме пакета PySpark Conda
Когда я устанавливаю точку останова внутри кода, она работает для меня в коде драйвера, но не останавливается внутри функция mapper_convert_row .
Как отладить эту функцию бизнес-логики c в локальной тестовой среде? Тот же подход в scala отлично работает, но этот код должен быть в python.