Я в новинку с spark, и я использую PySpark для запуска списка пользователей, которые подключились к моему приложению через Facebook Login.
В кластере установлен Spark 2.3.1.5 экземпляров + мастер, на каждом я установил библиотеку "facebook-sdk".Когда я запускаю одну команду, я получаю ответ от Facebook GraphAPI и могу использовать его по своему усмотрению.Проблема в том, что я использую команды GraphAPI внутри функции и пытаюсь запустить ее с помощью mapPartitions.
Это мой код:
import facebook
import pyspark
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
def fb_func(rows):
fields='id,first_name,middle_name,last_name,gender,birthday'
ret = []
for row in rows:
token = row[6]
id = row[7]
graph = facebook.GraphAPI(access_token=token, version=3.0)
user = graph.get_object(id=id, fields=fields)
ret.append(str(id) + "," + str(token) + "," + str(user['first_name']) + "," + str(user['middle_name']) + "," + str(user['last_name']) + "," + str(user['gender']) + "," + str(user['birthday']))
return ret
file = spark.read.parquet("s3://parquet-files/fb_data/*")
fb_file = file.repartition(5)
fb_schema = StructType([
StructField('user_id', LongType(), True),
StructField('token', StringType(), True),
StructField('first_name', StringType(), True),
StructField('middle_name', StringType(), True),
StructField('last_name', StringType(), True),
StructField('gender', StringType(), True),
StructField('birthday', StringType(), True)
])
fb_rdd = fb_file.rdd\
.mapPartitions(fb_func)\
.toDF(fb_schema)\
.repartition(2)\
.collect()
Это ошибка, которую я получаю обратно:
Py4JJavaError: An error occurred while calling o98.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 29, ip-185-16-116-233.ec2.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 217, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 59, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/cloudpickle.py", line 929, in subimport
__import__(name)
ImportError: ('No module named facebook', <function subimport at 0x7feea89b26e0>, ('facebook',))
Я удостоверился, что библиотека FB установлена во всех экземплярах, поэтому я понятия не имею, почему я получаю эту ошибку ... Если кто-нибудь знает, почему, я был бы очень признателен за любую помощьприйти!