Создание нового столбца из другого столбца в Apache Spark с использованием UDF - PullRequest
0 голосов
/ 26 октября 2018

Я пытаюсь создать новый столбец из другого столбца в Apache Spark.

Данные (сильно сокращенно) выглядят как

Date    Day_of_Week
2018-05-26T00:00:00.000+0000    5
2018-05-05T00:00:00.000+0000    6

и должны выглядеть как

Date    Day_of_Week    Weekday
2018-05-26T00:00:00.000+0000    5    Thursday
2018-05-05T00:00:00.000+0000    6    Friday

Я попробовал совет из руководства https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#register-the-function-as-a-udf & Как передать постоянное значение в Python UDF? & PySpark добавить столбец в DataFrame из столбца TimeStampType

, что привело к:

def int2day (day_int):
  if day_int == 1:
    return 'Sunday'
  elif day_int == 2:
    return 'Monday'
  elif day_int == 3:
    return 'Tuesday'
  elif day_int == 4:
    return 'Wednesday'
  elif day_int == 5:
    return 'Thursday'
  elif day_int == 6:
    return 'Friday'
  elif day_int == 7:
    return 'Saturday'
  else:
    return 'FAIL'

spark.udf.register("day", int2day, IntegerType())
df2 = df.withColumn("Day", day("Day_of_Week"))

и дает длинную ошибку

SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 262, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 257, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 325, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 141, in dump_stream
    self._write_with_length(obj, stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 151, in _write_with_length
    serialized = self.dumps(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 556, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Я не понимаю, как я могу применить Как пройтипостоянное значение Python UDF? здесь, так как их пример был намного проще (только true или false)

Я также пытался использовать функции карты, как в PySpark добавить столбец вDataFrame из столбца TimeStampType

, но

df3 = df2.withColumn("weekday", map(lambda x: int2day, col("Date"))) просто говорит, что TypeError: argument 2 to map() must support iteration, но я думал, что col поддерживает итерацию.

Я прочитал каждый пример в Интернете, который смог найти.Я не понимаю, как применить другие вопросы к моему делу.

Как добавить другой столбец, используя функцию другого столбца?

1 Ответ

0 голосов
/ 26 октября 2018

Вам вообще не нужен UDF, чтобы выполнить то, что вы пытаетесь сделать. Вы можете использовать встроенную функцию pyspark date_format, чтобы извлечь имя для каждого дня недели, указанного в столбце.

import pyspark.sql.functions as func
df = df.withColumn("day_of_week", func.date_format(func.col("Date"), "EEEE"))

Результатом является добавление нового столбца к вашему фрейму данных с именем day_of_week, который будет отображать воскресенье, понедельник, вторник и т. Д. В зависимости от значения в столбце Date.

...