Pyspark применяет функцию к датафрейму - PullRequest
0 голосов
/ 09 января 2020

Мой python метод ниже;

def leadtime_crossdock_calc(slt, wlt, dow, freq):
    temp_lt = [0, 0, 0, 0, 0, 0, 0]
    remaining = []
    for i in range(0, 7):
        remaining.append((dow[i:] + dow[:i]).index(1))
    for i in range(7):
        if freq[i] == 1:
            supplier_lt = int(slt[i])
            warehouse_lt = int(wlt[(i + supplier_lt) % 7])
            waiting = int(remaining[(i + supplier_lt + warehouse_lt) % 7])
            temp_lt[i] = supplier_lt + warehouse_lt + waiting
    for i in range(7):
        if temp_lt[i] == 0:
            temp_lt[i] = next((value for index, value in enumerate(temp_lt[i:] + temp_lt[:i]) if value), None)
    return ''.join(str(x) for x in temp_lt)

И это получается для приведенного ниже примера;

leadtime_crossdock_calc([0,2,0,2,0,3,0],[1,1,1,1,1,1,1],[0,0,1,0,1,0,1],[0,1,0,1,0,1,0])

'3333443'

Вопрос в том, у меня есть кадр данных искры, как показано ниже;

Product  Store  slt               wlt            dow              freq
A         B     [0,2,0,2,0,3,0]  [1,1,1,1,1,1,1] [0,0,1,0,1,0,1]  [0,1,0,1,0,1,0]

Я хочу создать новый столбец для каждой новой строки в кадре данных, используя вышеуказанный метод;

Product  Store  slt               wlt            dow              freq              result
A         B     [0,2,0,2,0,3,0]  [1,1,1,1,1,1,1] [0,0,1,0,1,0,1]  [0,1,0,1,0,1,0]   [3,3,3,3,4,4,3]

Не могли бы вы помочь мне об этом? Я не смог применить метод для фрейма данных spark.

1 Ответ

0 голосов
/ 11 января 2020

Вы можете использовать Пользовательские функции или UDF Сначала зарегистрируйте свой UDF при искре, указав тип возвращаемой функции. Вы можете использовать что-то вроде этого:

from pyspark.sql.types import StringType, col
leadtime_udf = spark.udf.register("leadtime_udf", leadtime_crossdock_calc, StringType())

Затем вы можете применить этот UDF к вашему DataFrame (или также в Spark SQL)

df.select("*", leadtime_udf(col(slt), ... , col(freq)))

Надеюсь, это поможет

...