PySpark Преобразуйте алгоритм в UDF и примените его к DataFrame. - PullRequest
1 голос
/ 15 января 2020

Я написал алгоритм, который что-то делает и печатает вывод. Вход для моего алгоритма представляет собой список с некоторыми целыми числами. Вот пример ввода в виде списка.

`mylist = [5,6,14,15,16,17,18,19,20,28,40,41,42,43,44,55]`

и вот мой алгоритм

    ```     

tduration = 0
duration = 0
avg = 0
bottleneck = 0
y = 0
x = 0
while x<len(mylist)-4 and y<len(mylist)-1 :
  if mylist[x+4] == mylist[x]+4:
    y = x + 4
    print("MY LIST X = ",mylist[x])
    print("X = ", x)
    print ("Y = ", y)
    while True:
      if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
        bottleneck = bottleneck + 1
        duration = mylist[y] - mylist[x] + 1
        tduration = tduration + duration
        avg = tduration/bottleneck
        x = y + 1
        print("MY LIST Y = " , mylist[y])
        print("Duration = " , duration)
        break
      else: 
        y = y + 1
  else: 
    x = x + 1
print("BottleneckCount = ", bottleneck,  "\nAverageDuration = ", avg)

 ```

Теперь я хочу преобразовать этот «алгоритм» в пользователя Определенная функция (UDF) в PySpark, а затем применить эту UDF к DataFrame с одним столбцом. В каждой строке этого DataFrame есть один список. Образец DataFrame имеет 1 столбец и 2 строки. row1 - это список [10,11,19,20,21,22,23,24,25,33,45], а row2 - это список [55,56,57,58,59,60,80,81,82,83,84,85,92,115], поэтому UDF следует применять к каждой строке DataFrame отдельно и предоставлять результаты для каждой строки в другом столбце. Заранее спасибо за ваше время и помощь. Я буду голосовать ваши ответы

Ответы [ 2 ]

1 голос
/ 16 января 2020

Вот способ, которым вы можете сделать:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, ArrayType

def calculate(mylist):

    tduration = 0
    duration = 0
    avg = 0
    bottleneck = 0
    y = 0
    x = 0
    while x<len(mylist)-4 and y<len(mylist)-1 :

        if mylist[x+4] == mylist[x]+4:
            y = x + 4
            print("MY LIST X = ",mylist[x])
            print("X = ", x)
            print ("Y = ", y)
            while True:
                if y==len(mylist)-1 or mylist[y+1] > mylist[y]+10:
                    bottleneck = bottleneck + 1
                    duration = mylist[y] - mylist[x] + 1
                    tduration = tduration + duration
                    avg = tduration/bottleneck
                    x = y + 1
                    print("MY LIST Y = " , mylist[y])
                    print("Duration = " , duration)
                    break
                else:
                    y = y + 1
        else:
            x = x + 1
    return bottleneck, avg

# sample data frame to use
df = spark.createDataFrame(
    [
        [[10,11,19,20,21,22,23,24,25,33,45]],
        [[55,56,57,58,59,60,80,81,82,83,84,85,92,115]],
    ],
    ['col1',]
)

df.show()

+--------------------+
|                col1|
+--------------------+
|[10, 11, 19, 20, ...|
|[55, 56, 57, 58, ...|
+--------------------+

# convert values to int  --- edit
f_to_int = F.udf(lambda x: list(map(int, x)))
df = df.withColumn('col1', f_to_int('col1'))

# create udf
func = F.udf(lambda x: calculate(x), ArrayType(IntegerType()))

# apply udf
df = df.withColumn('vals', func('col1'))

# create new cols
df = df.select("col1", df.vals[0].alias('bottleneck'), df.vals[1].alias('avg'))

df.show()

+--------------------+----------+----+
|                col1|bottleneck| avg|
+--------------------+----------+----+
|[10, 11, 19, 20, ...|         1|null|
|[55, 56, 57, 58, ...|         2|null|
+--------------------+----------+----+
0 голосов
/ 24 января 2020

YOLO ответил на этот вопрос, и это полный ответ. Единственная проблема заключается в том, что в последнем столбце «avg» мы получаем значения NULL. Я понял, что могу решить эту проблему, используя вместо этого «fun c» вместо «fun *1004*» в ответе YOLO.

import pyspark.sql.types as T
func = F.udf(lambda x: calculate(x), T.StructType(
        [T.StructField("val1", T.IntegerType(), True),
         T.StructField("val2", T.FloatType(), True)]))
...