У меня есть искомые данные для передачи задания в ElasticDocument и загрузки их в кластерasticSearch. Реализация задания выглядит следующим образом:
origin_df.rdd.foreachPartition(lambda partition: self._create_es_document(partition))
def _create_es_document(self, partition: List[Row]) -> None:
es_docs = []
for row in partition:
es_document = _convert_to_es_document(row)
es_docs.append(poi_document)
success, failed = bulk(self.es, (es_doc.to_dict(True) for es_doc in es_docs), True)
Вот стековая дорожка:
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 806, in foreachPartition
self.mapPartitions(func).count() # Force evaluation
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2532, in _jrdd
self._jrdd_deserializer, profiler)
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2434, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 600, in dumps
raise pickle.PicklingError(msg)
Сообщение:
Could not serialize object: TypeError: Cannot serialize socket object
Не уверен, что объект сокета от массовой операции,И как я могу решить это?
Спасибо, Эдвард