Функция Python в Spark - PullRequest
       3

Функция Python в Spark

1 голос
/ 11 октября 2019

Я пытаюсь округлить столбец deadline_date вперед или назад, основываясь на флаге в фрейме data_prioritization_rounding. -1 означает назад, 0 означает ничего, а 1 означает вперед.

Функция работает, когда я использую одну дату в качестве переменной, но я изо всех сил пытался применить ее ко всему набору данных. Я получаю сообщение об ошибке: «ValueError: Невозможно преобразовать столбец в bool: используйте« & »для« и »,« | »для 'or', '~' для 'not' при построении логических выражений DataFrame. "при попытке передать часть столбца функции. Относительно новая функция построения в Python.

from pyspark.sql.functions import next_day, date_sub
from pyspark.sql.functions import to_date


def next_date(column,date,dayOfWEek):
  if column == -1:
    return date_sub(next_day(date,dayOfWEek),0)
  elif column == 1:
    return date_sub(next_day(date,dayOfWEek),7)
  else:
    return date


activity_prioritization_rounding= sql("""select * from spa.activity_master""")
activity_prioritization_rounding.withColumn(
   "New_Date",
    next_date(col("deadline_rounding"),col("deadline_date"),"Friday"))
)

1 Ответ

2 голосов
/ 11 октября 2019

Вам нужно создать udf из вашей функции python и отправить Friday в виде столбца, так как он не будет транслироваться через фрейм данных. Вы можете сделать это, используя lit.

from pyspark.sql.functions import udf, next_day, date_sub, to_date, lit
from pyspark.sql.types import DateType

activity_prioritization_rounding.withColumn("New_Date",udf(next_date(col("deadline_rounding"),col("deadline_date"),lit("Friday"), DateType())))

РЕДАКТИРОВАТЬ: Как справедливо упомянуто @jxc, вы не можете использовать функции искры внутри UDF.

Упрощение этого до when().when().otherwise()

from pyspark.sql.functions import udf, next_day, date_sub, to_date, lit, when, col

day_of_week = "Friday"
activity_prioritization_rounding.withColumn("New_Date", when(
    col("deadline_rounding") == -1, date_sub(next_day(col("deadline_date"), day_of_week), 0)).when(
    col("deadline_rounding") == 1, date_sub(next_day(col("deadline_date"), day_of_week), 7)).otherwise(
    col("deadline_date")))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...