Вызов API Facebook Graph в PySpark - PullRequest
       0

Вызов API Facebook Graph в PySpark

0 голосов
/ 09 декабря 2018

Я в новинку с spark, и я использую PySpark для запуска списка пользователей, которые подключились к моему приложению через Facebook Login.

В кластере установлен Spark 2.3.1.5 экземпляров + мастер, на каждом я установил библиотеку "facebook-sdk".Когда я запускаю одну команду, я получаю ответ от Facebook GraphAPI и могу использовать его по своему усмотрению.Проблема в том, что я использую команды GraphAPI внутри функции и пытаюсь запустить ее с помощью mapPartitions.

Это мой код:

import facebook
import pyspark
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

def fb_func(rows):
    fields='id,first_name,middle_name,last_name,gender,birthday'
    ret = []
    for row in rows:
        token = row[6]
        id = row[7]
        graph = facebook.GraphAPI(access_token=token, version=3.0)
        user = graph.get_object(id=id, fields=fields)

        ret.append(str(id) + "," + str(token) + "," + str(user['first_name']) + "," + str(user['middle_name']) + "," + str(user['last_name']) + "," + str(user['gender']) + "," + str(user['birthday']))

    return ret

file = spark.read.parquet("s3://parquet-files/fb_data/*")
fb_file = file.repartition(5)

fb_schema = StructType([
    StructField('user_id', LongType(), True), 
    StructField('token', StringType(), True), 
    StructField('first_name', StringType(), True), 
    StructField('middle_name', StringType(), True), 
    StructField('last_name', StringType(), True), 
    StructField('gender', StringType(), True), 
    StructField('birthday', StringType(), True)
])

fb_rdd = fb_file.rdd\
    .mapPartitions(fb_func)\
    .toDF(fb_schema)\
    .repartition(2)\
    .collect()

Это ошибка, которую я получаю обратно:

Py4JJavaError: An error occurred while calling o98.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 29, ip-185-16-116-233.ec2.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 217, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/worker.py", line 59, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1540215271155_0064/container_1540215271155_0064_01_000002/pyspark.zip/pyspark/cloudpickle.py", line 929, in subimport
    __import__(name)
ImportError: ('No module named facebook', <function subimport at 0x7feea89b26e0>, ('facebook',))

Я удостоверился, что библиотека FB установлена ​​во всех экземплярах, поэтому я понятия не имею, почему я получаю эту ошибку ... Если кто-нибудь знает, почему, я был бы очень признателен за любую помощьприйти!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...