Мой класс enum вызывает проблемы при выполнении задания Pyspark.Если я запускаю код за пределами Spark, проблем не возникает.Я использую Python 3.6.
Класс Enum:
class DatumType(Enum):
COOL = 0
EXTREMELY_COOL = 1
MAN_WHAT_A_GREAT_FEATURE = 2
PRETTY_GOOD_TOO = 3
THE_BEST_ONE = 4
Класс, который использует Enum:
class FeatureDescriptor:
description: str
identifier: str
datum_type: DatumType
def __init__(self, identifier: str, description: str, datum_type: DatumType) -> None:
self.description = description
self.identifier = identifier
self.datum_type = datum_type
Каждая созданная функция имеет прикрепленный FeatureDescriptor,Если я запускаю это вне работы Spark, никаких проблем не возникает.Если я запускаю его как часть задания Spark, происходит следующее:
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage 136.0 (TID 918800, ip-10-0-0-167.ec2.internal, executor 19): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/livy/appcache/application_1569268438115_0001/container_1569268438115_0001_01_000336/pyspark.zip/pyspark/worker.py", line 364, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/livy/appcache/application_1569268438115_0001/container_1569268438115_0001_01_000336/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/livy/appcache/application_1569268438115_0001/container_1569268438115_0001_01_000336/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/livy/appcache/application_1569268438115_0001/container_1569268438115_0001_01_000336/pyspark.zip/pyspark/serializers.py", line 580, in loads
return pickle.loads(obj, encoding=encoding)
File "/mnt/yarn/usercache/livy/appcache/application_1569268438115_0001/container_1569268438115_0001_01_000336/virtualenv_application_1569268438115_0001_0/lib64/python3.6/enum.py", line 135, in __new__
enum_members = {k: classdict[k] for k in classdict._member_names}
AttributeError: 'dict' object has no attribute '_member_names'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Ключевая часть выглядит следующим образом:
enum_members = {k: classdict[k] for k in classdict._member_names}
AttributeError: 'dict' object has no attribute '_member_names'
Из того, что я понимаю, объект словаря _member_names долженбыть создан при создании экземпляра класса Enum.Это происходит без проблем, пока Spark не участвует.
Я убедился, что записная книжка Jupyter вызывает правильную версию класса Enum и не использует стороннюю библиотеку.