Исправление функции в PySpark с использованием MagicMock не исправляет в процессе запуска - PullRequest
0 голосов
/ 10 апреля 2019

У меня есть юнит-тест (использующий PyTest ), который запускает мои PySpark тесты.У меня есть нормальный conftest.py, который создает SQLContext .Я хотел бы получить один и тот же uuid4 во всех случаях, поэтому я пропатчил uuid4 в своем тесте.Если я вызываю uuid.uuid4() из функции теста, все хорошо.

Однако, когда я запускаю задание PySpark, которое также вызывает uuid4, оно не исправлено:

Моя функция PySpark (упрощенная):

def create_uuid_if_needed(current, prev):
    if current > prev:
        return str(uuid.uuid4())
    else:
        return None


def my_df_func(df):
    my_udf = udf(create_uuid_if_needed, T.StringType())    
    my_window = Window.partitionBy(F.col(PARTITIONING_KEY)).orderBy(F.col(ORDER))
    return df.withColumn('new_col', my_udf(df.col, F.lag(df.col, 1)).over(my_window))

Мой тест выглядит следующим образом:

@patch.object(uuid, 'uuid4', return_value='1-1-1-1')
def test_add_activity_period_start_id(mocker, sql_context, input_fixture):
    input_df = sql_context.createDataFrame(input_fixture, [... schema...])    
    good_uuid = str(uuid.uuid4())
    another_goood_uuid = create_uuid_if_needed(2, 1)
    actual_df = my_df_func(input_df)
    ...

good_uuid получает правильное значение - '1-1-1-1', и то же самое another_good_uuid, но udf-версия функции dataframe по-прежнему вызывает непатентованный uuid4.

Что здесь не так?Это то, что делает функция udf()?Спасибо!

...