Как я могу объединить pyspark, графен и колбу вместе? - PullRequest
0 голосов
/ 13 апреля 2019

В моем коде я использую pyspark для манипулирования данными, python graphene для построения graphql сервера вокруг моих данных и flask для обслуживания API GraphQL.Но я сталкиваюсь с некоторыми проблемами, которые я не знаю, почему это происходит.

В основном код не вычисляет правильные результаты, и я предполагаю, что в целом может возникнуть проблема параллелизма.

Вот упрощение моего кода:

import graphene
from flask import Flask
from flask_graphql import GraphQLView

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

app = Flask(__name__)

spark = SparkSession.builder \
    .master("local") \
    .appName("Sencivil") \
    .config("spark.driver.allowMultipleContexts", "true") \
    .getOrCreate()

df = spark.read.format("csv")\
    .option("sep", ";")\
    .option("header", "true")\
    .load("./people.csv")\
    .cache()


class Birthdate(graphene.ObjectType):
    boys = graphene.Int()
    girls = graphene.Int()

    def __init__(self, bd):
        self.bd = bd

    def resolve_boys(self, info):
        boys = df.where(df.gender == 1).groupby("birthdate").count()
        return boys.count()

    def resolve_girls(self, info):
        girls = df.where(df.gender == 2).groupby("birthdate").count()
        return girls.count()


class Person(graphene.ObjectType):
    born = graphene.List(Birthdate)

    def resolve_born(self, info):
        bds = [Birthdate(row.asDict()["birthdate"])
               for row in df.select("birthdate").collect()]
        return bds


class Query(graphene.ObjectType):
    people = graphene.Field(Person)

    def resolve_people(self, info):
        return Person()


schema = graphene.Schema(query=Query)

app.add_url_rule(
    "/graphql", view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True))


if __name__ == "__main__":
    app.run(debug=True)

people.csv

birthdate;gender
13-01-1987;1
02-09-1986;2
13-01-1987;1
02-09-1986;2
12-04-1998;1

Проблема заключается в resolve_boys и resolve_girls методов класса Birthdate, где я ожидаю получить 2 для мальчиков и 1 для девочек, как мы можем видеть непосредственно из оболочки pyspark:

>>> boys = df.where(df.gender == 1).groupby("birthdate").count()
>>> boys.show()
+----------+-----+                                                              
| birthdate|count|
+----------+-----+
|13-01-1987|    2|
|12-04-1998|    1|
+----------+-----+

>>> girls = df.where(df.gender == 2).groupby("birthdate").count()
>>> girls.show()
+----------+-----+                                                              
| birthdate|count|
+----------+-----+
|02-09-1986|    2|
+----------+-----+

Но вместо этого я простополучить 0 от API:

GraphiQL

Так как решить эту проблему?

Если я изменил код, где он читаетCSV-файл для создания локального кадра данных, как это

# df = spark.read.format("csv")\
#     .option("sep", ";")\
#     .option("header", "true")\
#     .load("./people.csv")\
#     .cache()

df = spark.createDataFrame(
    [{'birthdate': "13-01-1987", "gender": 1},
     {'birthdate': "02-09-1986", "gender": 2}, 
     {'birthdate': "13-01-1987", "gender": 1},
     {'birthdate': "02-09-1986", "gender": 2}, 
     {'birthdate': "12-04-1998", "gender": 1}]
)

я получаю правильный ответ:

enter image description here

Таким образом, проблема возникаетпри чтении файла CSV.Но почему это происходит?

...