У меня есть юнит-тест (использующий PyTest ), который запускает мои PySpark тесты.У меня есть нормальный conftest.py
, который создает SQLContext .Я хотел бы получить один и тот же uuid4 во всех случаях, поэтому я пропатчил uuid4 в своем тесте.Если я вызываю uuid.uuid4()
из функции теста, все хорошо.
Однако, когда я запускаю задание PySpark, которое также вызывает uuid4, оно не исправлено:
Моя функция PySpark (упрощенная):
def create_uuid_if_needed(current, prev):
if current > prev:
return str(uuid.uuid4())
else:
return None
def my_df_func(df):
my_udf = udf(create_uuid_if_needed, T.StringType())
my_window = Window.partitionBy(F.col(PARTITIONING_KEY)).orderBy(F.col(ORDER))
return df.withColumn('new_col', my_udf(df.col, F.lag(df.col, 1)).over(my_window))
Мой тест выглядит следующим образом:
@patch.object(uuid, 'uuid4', return_value='1-1-1-1')
def test_add_activity_period_start_id(mocker, sql_context, input_fixture):
input_df = sql_context.createDataFrame(input_fixture, [... schema...])
good_uuid = str(uuid.uuid4())
another_goood_uuid = create_uuid_if_needed(2, 1)
actual_df = my_df_func(input_df)
...
good_uuid
получает правильное значение - '1-1-1-1', и то же самое another_good_uuid
, но udf-версия функции dataframe по-прежнему вызывает непатентованный uuid4.
Что здесь не так?Это то, что делает функция udf()
?Спасибо!