pyspark: неожиданное поведение при использовании нескольких функций UDF в одном столбце (с массивами) - PullRequest
1 голос
/ 17 июня 2020

Кто-нибудь знает, что происходит, когда вы используете несколько функций udf:

Я создаю тестовый фрейм данных и две exaple udf функции:

from pyspark.sql.functions import udf

mylist = [
    [[1,2,3]],
    [[4,5,6]]
]

def f1(tlist):
    tlist[0]=111
    return 'result f1 is: {}'.format(tlist)
f1_udf = udf(f1, )

def f2(tlist):
    tlist[1]=222
    return 'result f2 is: {}'.format(tlist)
f2_udf = udf(f2, )

df = spark().createDataFrame(mylist).toDF('arr')
df.show()

Получение следующего результата:

+---------+
|      arr|
+---------+
|[1, 2, 3]|
|[4, 5, 6]|
+---------+

затем я применяю каждую функцию отдельно:

df.withColumn('f1', f1_udf('arr')).show(10, False)

дает

+---------+-------------------------+
|arr      |f1                       |
+---------+-------------------------+
|[1, 2, 3]|result f1 is: [111, 2, 3]|
|[4, 5, 6]|result f1 is: [111, 5, 6]|
+---------+-------------------------+

и

df.withColumn('f2', f2_udf('arr')).show(10,False)

дает

+---------+-------------------------+
|arr      |f2                       |
+---------+-------------------------+
|[1, 2, 3]|result f2 is: [1, 222, 3]|
|[4, 5, 6]|result f2 is: [4, 222, 6]|
+---------+-------------------------+

Но, и здесь начинается неожиданное поведение

(df
 .withColumn('f1', f1_udf('arr'))
 .withColumn('f2', f2_udf('arr'))
).show(10, False)

дает неожиданный результат, смешивая результат обеих функций во втором вызове функции

+---------+-------------------------+---------------------------+
|arr      |f1                       |f2                         |
+---------+-------------------------+---------------------------+
|[1, 2, 3]|result f1 is: [111, 2, 3]|result f2 is: [111, 222, 3]|
|[4, 5, 6]|result f1 is: [111, 5, 6]|result f2 is: [111, 222, 6]|
+---------+-------------------------+---------------------------+
                                                    ^^^ : unexpected result

А при изменении порядка вызова функций

(df
 .withColumn('f2', f2_udf('arr'))
 .withColumn('f1', f1_udf('arr'))
).show(10, False)

дает другой, тоже неожиданный результат:

+---------+-------------------------+---------------------------+
|arr      |f2                       |f1                         |
+---------+-------------------------+---------------------------+
|[1, 2, 3]|result f2 is: [1, 222, 3]|result f1 is: [111, 222, 3]|
|[4, 5, 6]|result f2 is: [4, 222, 6]|result f1 is: [111, 222, 6]|
+---------+-------------------------+---------------------------+
                                                         ^^^ : different unexpected result

Кажется, что вызов функций в фиксированном, неизмененном столбце искры не является независимым друг от друга, а это означает, что если мы вызываем обе функции (даже с большим количеством другого кода между ними), результаты первого вызова функции смешиваются в следующий ... Или я что-то упустил?

1 Ответ

1 голос
/ 17 июня 2020

Spark передает один и тот же массив в обе функции f1 и f2. Поскольку первая функция изменяет содержимое массива, вторая функция также видит эти изменения. Вы можете увидеть, что если вы добавите строку

print("f1: id  of array is {}, content is {}".format(id(tlist), tlist))

к первой функции и

print("f2: id  of array is {}, content is {}".format(id(tlist), tlist))

ко второй функции.

Это напечатает

f1: id  of array is 139782923179912, content is [1, 2, 3]
f2: id  of array is 139782923179912, content is [111, 2, 3]
f1: id  of array is 139782923180040, content is [4, 5, 6]
f2: id  of array is 139782923180040, content is [111, 5, 6]

(возможно, напечатано немного не по порядку)

Таким образом, вторая функция видит массив, который был изменен в первой функции.

Чтобы решить проблему, функции должны создать свои собственные копии массива и изменять только эти копии:

def f1(tlist):
    print("f1: id  of array is {}, content is {}".format(id(tlist), tlist))
    newlist = tlist.copy()
    newlist[0]=111
    return 'result f1 is: {}'.format(newlist)

Соответственно f2.

Другой способ получить ожидаемое поведение - объявить оба udf как не- Detectoristi c:

f1_udf = F.udf(f1, ).asNondeterministic()
f2_udf = F.udf(f2, ).asNondeterministic()

Однако я не могу объяснить, почему это помогает.

...