использование методов класса Python в RDD - PullRequest
2 голосов
/ 01 июля 2019

Мой вопрос может звучать несколько похоже на это и это , но попытки их решения также не помогли мне.
У меня есть токенизатор класса, определенный как * 1006

class Tokenizer:
    def __init__(self, preserve_case=False):
        self.preserve_case = preserve_case

    def tokenize(self, s):
        """
        Argument: s -- any string or unicode object
        Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
        """        
        # Try to ensure unicode:
        try:
            s = str(s)
        except UnicodeDecodeError:
            s = s.encode('string_escape')
            s = str(s)
        # Fix HTML character entitites:
        s = self.__html2unicode(s)
        # Tokenize:
        words = word_re.findall(s)
        # Possible alter the case, but avoid changing emoticons like :D into :d:
        if not self.preserve_case:            
            words = map((lambda x : x if emoticon_re.search(x) else x.lower()), words)
        return words
tok=Tokenizer(preserve_case=False)

У меня есть (ключ, значение) RDD (user_id, твиты). Я хочу использовать твиты СДР в функции токенизации класса токенизатор. То, что я сделал, было -

rdd.foreach(lambda x:tok.tokenize(x[1])).take(5)  

и получил ошибку-

Объект 'NoneType' не имеет атрибута 'take'

Я тоже пробовал -

rdd1.map(lambda x:tok.tokenize(x[1])).take(5)  

и получил ошибку-

Py4JJavaError Traceback (последний последний вызов) в () ----> 1 rdd1.map (лямбда-х: tok.tokenize (x 1 )). Take (5)

~ / anaconda3 / lib / python3.6 / site-packages / pyspark / rdd.py в дубле (self, num) 1358 1359 p = range (partsScanned, min (partsScanned + numPartsToTry, totalParts)) -> 1360 res = self.context.runJob (self, takeUpToNumLeft, p) 1361 1362 элемента + = res

~ / anaconda3 / lib / python3.6 / site-packages / pyspark / context.py в runJob (self, rdd, partitionFunc, partition, allowLocal) 1067

SparkContext # runJob. 1068 mappedRDD = rdd.mapPartitions (partitionFunc)

-> 1069 sock_info = self._jvm.PythonRDD.runJob (self._jsc.sc (), mappedRDD._jrdd, разделы) 1070 return список (_load_from_socket (sock_info, mappedRDD._jrdd_deserializer))
1071

~ / anaconda3 / lib / python3.6 / site-packages / py4j / java_gateway.py в call (self, * args) 1255 answer = self.gateway_client.send_command (команда) 1256 return_value = get_return_value ( -> ответ 1257, self.gateway_client, self.target_id, self.name) 1258 1259 для temp_arg в temp_args:

~ / anaconda3 / lib / python3.6 / site-packages / py4j / protocol.py в get_return_value (ответ, gateway_client, target_id, name) 326 поднять Py4JJavaError ( 327 "Произошла ошибка при вызове {0} {1} {2}. \ N". -> 328 формат (target_id, ".", Name), значение) Еще 329: 330 рейз Py4JError (

Py4JJavaError: во время вызова произошла ошибка г: org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: Задача 0 на этапе 39.0 не выполнена 1 раз, последний сбой: потерянная задача 0.0 на этапе 39.0 (TID 101, localhost, драйвер исполнителя): org.apache.spark.api.python.PythonException: обратная связь (самая последняя последний звонок): File "/Home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", линия 377, в основном файл process () "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", строка 372, в процессе Файл serializer.dump_stream (func (split_index, iterator), outfile) "/Home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", строка 397, в dump_stream bytes = self.serializer.dumps (vs) Файл "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", линия 576, в отвалах return pickle.dumps (obj, protocol) AttributeError: Невозможно выбрать локальный объект 'Tokenizer.tokenize ..'

в org.apache.spark.api.python.BasePythonRunner $ ReaderIterator.handlePythonException (PythonRunner.scala: 452) в org.apache.spark.api.python.PythonRunner $$ Анон $ 1.read (PythonRunner.scala: 588) в org.apache.spark.api.python.PythonRunner $$ Анон $ 1.read (PythonRunner.scala: 571) в org.apache.spark.api.python.BasePythonRunner $ ReaderIterator.hasNext (PythonRunner.scala: 406) в org.apache.spark.InterruptibleIterator.hasNext (InterruptibleIterator.scala: 37)в scala.collection.Iterator $ class.foreach (Iterator.scala: 891) в org.apache.spark.InterruptibleIterator.foreach (InterruptibleIterator.scala: 28) в scala.collection.generic.Growable $ класса $ плюс $ плюс $ эк (Growable.scala: 59). в scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ эк (ArrayBuffer.scala: 104). в scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ эк (ArrayBuffer.scala: 48). в scala.collection.TraversableOnce $ class.to (TraversableOnce.scala: 310) в org.apache.spark.InterruptibleIterator.to (InterruptibleIterator.scala: 28) в scala.collection.TraversableOnce $ class.toBuffer (TraversableOnce.scala: 302) в org.apache.spark.InterruptibleIterator.toBuffer (InterruptibleIterator.scala: 28) в scala.collection.TraversableOnce $ class.toArray (TraversableOnce.scala: 289) в org.apache.spark.InterruptibleIterator.toArray (InterruptibleIterator.scala: 28) в org.apache.spark.api.python.PythonRDD $$ anonfun $ 3.Apply (PythonRDD.scala: 153) в org.apache.spark.api.python.PythonRDD $$ anonfun $ 3.Apply (PythonRDD.scala: 153) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 2101) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 2101) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 90) в org.apache.spark.scheduler.Task.run (Task.scala: 121) в org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply (Executor.scala: 408) в org.apache.spark.util.Utils $ .tryWithSafeFinally (Utils.scala: 1360) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 414) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) at java.lang.Thread.run (Thread.java:748)

трассировка стека драйверов: в org.apache.spark.scheduler.DAGScheduler.org $ апача $ искры $ планировщик $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1889) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.Apply (DAGScheduler.scala: 1877) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.Apply (DAGScheduler.scala: 1876) в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 48) в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1876) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.Apply (DAGScheduler.scala: 926) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.Apply (DAGScheduler.scala: 926) в scala.Option.foreach (Option.scala: 257) в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 926) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala: 2110) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 2059) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 2048) в org.apache.spark.util.EventLoop $$ anon $ 1.run (EventLoop.scala: 49) в org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 737) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2061) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2082) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2101) в org.apache.spark.api.python.PythonRDD $ .runJob (PythonRDD.scala: 153) в org.apache.spark.api.python.PythonRDD.runJob (PythonRDD.scala) в sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) вpy4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) в py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) в py4j.Gateway.invoke (Gateway.java:282) в py4j.commands.inkeom.(AbstractCommand.java:132) на py4j.commands.CallCommand.execute (CallCommand.java:79) на py4j.GatewayConnection.run (GatewayConnection.java:238) на java.lang.Thread.run (Thread.java:748)Вызывается: org.apache.spark.api.python.PythonException: обратная связь (последний вызов был последним): файл "/home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip / pyspark / worker.py ", строка 377, в файле основного процесса ()" /home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py ", строка 372, в процессе serializer.dump_stream (func (split_index, iterator), outfile) Файл" /home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark / serializers.py ", строка 397, в файле dump_stream bytes = self.serializer.dumps (vs)" /home/kriti/Downloads/spark-2.4.3-bin-hadoop2.7 / python / lib / pyspark.zip / pyspark / serializers.py ", строка 576, в дампах возвращает pickle.dumps (obj, protocol) AttributeError: Невозможно выбрать локальный объект 'Tokenizer.tokenize ..'

в org.apache.spark.api.python.BasePythonRunner $ ReaderIterator.handlePythonException (PythonRunner.scala: 452) в org.apache.spark.api.python.PythonRunner $$ anon $ 1.read (PythonRunner.scala: 588) в org.apache.spark.api.python.PythonRunner $$ anon $ 1.read (PythonRunner.scala: 571) в org.apache.spark.api.python.BasePythonRunner $ ReaderIterator.hasNext (PythonRunner.scala: 406) в org.apache.spark.InterruptibleIterator.hasNext (InterruptibleIterator.scala: 37) в scala.collection.Iterator $ class.foreach (Iterator.scala: 891) в org.apache.spark.InterruptibleIterator.foreach (InterruptibleIterator.scala:28) в scala.collection.generic.Growable $ class. $ Plus $ plus $ eq (Growable.scala: 59) в scala.collection.mutable.ArrayBuffer. $ Plus $ plus $ eq (ArrayBuffer.scala: 104) в scala.collection.mutable.ArrayBuffer. $ плюс $ плюс $ эк (ArrayBuffer.scala: 48) в scala.collection.TraversableOnce $ class.to (TraversableOnce.scala: 310) в org.apache.spark.InterruptibleIterator.to (InterruptibleIterator.scala: 28) в scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala: 302) в org.apache.spark.InterruptibleIterator.toBuffer (InterruptibleIterator.scala: 28) в scala.collection.TraversableOnce $ class.toArray (TraversableOnce.scala: 289) в org.apache.spark.Intorruble.toArray (InterruptibleIterator.scala: 28) в org.apache.spark.api.python.PythonRDD $$ anonfun $ 3.apply (PythonRDD.scala: 153) в org.apache.spark.api.python.PythonRDD $$ anonfun $ 3.применить (PythonRDD.scala: 153) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 2101) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 2101) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 90) в org.apache.spark.scheduler.Task.run (Task.scala: 121) в org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply (Executor.scala: 408) в org.apache.spark.util.Utils $ .tryWithSafeFinally (Utils.scala: 1360) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 414) в Java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) ... еще 1

Любая помощь будетбыть благодарным.Заранее спасибо!

1 Ответ

0 голосов
/ 02 июля 2019

rdd.foreach(lambda x:tok.tokenize(x[1])).take(5)

Здесь вы пытаетесь получить доступ к результатам rdd.foreach () , что является нулем.

rdd1.map(lambda x:tok.tokenize(x[1])).take(5)

Здесь вы используете пользовательский объект с лямбдой, который выдает следующее исключение:

AttributeError: Невозможно выбрать локальный объект 'Tokenizer.tokenize ..'

, что практически означает, что pyspark не может сериализовать метод Tokenizer.tokenize. Одним из возможных решений является вызов tok.tokenize(x[1]) из функции, а затем передача ссылки на эту функцию на карте, как показано ниже:

def tokenize(x):
  return tok.tokenize(x[0])

rdd1.map(tokenize).take(5)

Также в вашем коде есть еще одна проблема. Класс Tokenizer пытается получить доступ к необъявленному методу self.__html2unicode(s). Это приведет к следующей ошибке:

AttributeError: 'Tokenizer' object has no attribute '_Tokenizer__html2unicode'

Похожие темы

PySpark: PicklingError: Не удалось сериализовать объект: TypeError: невозможно выбрать объекты CompiledFFI

https://github.com/yahoo/TensorFlowOnSpark/issues/198

...