Массовая загрузка документов в эластичный кластер AWS с использованием искры на раздел не удалась из-за PicklingError: Невозможно сериализовать объект сокета - PullRequest
0 голосов
/ 30 октября 2019

У меня есть искомые данные для передачи задания в ElasticDocument и загрузки их в кластерasticSearch. Реализация задания выглядит следующим образом:

origin_df.rdd.foreachPartition(lambda partition: self._create_es_document(partition))

def _create_es_document(self, partition: List[Row]) -> None:
    es_docs = []
    for row in partition:
        es_document = _convert_to_es_document(row)
        es_docs.append(poi_document)
        success, failed = bulk(self.es, (es_doc.to_dict(True) for es_doc in es_docs), True)

Вот стековая дорожка:

File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 806, in foreachPartition
    self.mapPartitions(func).count()  # Force evaluation
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2532, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2434, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 600, in dumps
    raise pickle.PicklingError(msg)

Сообщение:

Could not serialize object: TypeError: Cannot serialize socket object

Не уверен, что объект сокета от массовой операции,И как я могу решить это?

Спасибо, Эдвард

...