Я пытаюсь создать новый столбец из другого столбца в Apache Spark.
Данные (сильно сокращенно) выглядят как
Date Day_of_Week
2018-05-26T00:00:00.000+0000 5
2018-05-05T00:00:00.000+0000 6
и должны выглядеть как
Date Day_of_Week Weekday
2018-05-26T00:00:00.000+0000 5 Thursday
2018-05-05T00:00:00.000+0000 6 Friday
Я попробовал совет из руководства https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#register-the-function-as-a-udf & Как передать постоянное значение в Python UDF? & PySpark добавить столбец в DataFrame из столбца TimeStampType
, что привело к:
def int2day (day_int):
if day_int == 1:
return 'Sunday'
elif day_int == 2:
return 'Monday'
elif day_int == 3:
return 'Tuesday'
elif day_int == 4:
return 'Wednesday'
elif day_int == 5:
return 'Thursday'
elif day_int == 6:
return 'Friday'
elif day_int == 7:
return 'Saturday'
else:
return 'FAIL'
spark.udf.register("day", int2day, IntegerType())
df2 = df.withColumn("Day", day("Day_of_Week"))
и дает длинную ошибку
SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 262, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 257, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 325, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/databricks/spark/python/pyspark/serializers.py", line 141, in dump_stream
self._write_with_length(obj, stream)
File "/databricks/spark/python/pyspark/serializers.py", line 151, in _write_with_length
serialized = self.dumps(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 556, in dumps
return pickle.dumps(obj, protocol)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Я не понимаю, как я могу применить Как пройтипостоянное значение Python UDF? здесь, так как их пример был намного проще (только true или false)
Я также пытался использовать функции карты, как в PySpark добавить столбец вDataFrame из столбца TimeStampType
, но
df3 = df2.withColumn("weekday", map(lambda x: int2day, col("Date")))
просто говорит, что TypeError: argument 2 to map() must support iteration
, но я думал, что col
поддерживает итерацию.
Я прочитал каждый пример в Интернете, который смог найти.Я не понимаю, как применить другие вопросы к моему делу.
Как добавить другой столбец, используя функцию другого столбца?