В моем коде я использую pyspark
для манипулирования данными, python graphene
для построения graphql
сервера вокруг моих данных и flask
для обслуживания API GraphQL.Но я сталкиваюсь с некоторыми проблемами, которые я не знаю, почему это происходит.
В основном код не вычисляет правильные результаты, и я предполагаю, что в целом может возникнуть проблема параллелизма.
Вот упрощение моего кода:
import graphene
from flask import Flask
from flask_graphql import GraphQLView
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
app = Flask(__name__)
spark = SparkSession.builder \
.master("local") \
.appName("Sencivil") \
.config("spark.driver.allowMultipleContexts", "true") \
.getOrCreate()
df = spark.read.format("csv")\
.option("sep", ";")\
.option("header", "true")\
.load("./people.csv")\
.cache()
class Birthdate(graphene.ObjectType):
boys = graphene.Int()
girls = graphene.Int()
def __init__(self, bd):
self.bd = bd
def resolve_boys(self, info):
boys = df.where(df.gender == 1).groupby("birthdate").count()
return boys.count()
def resolve_girls(self, info):
girls = df.where(df.gender == 2).groupby("birthdate").count()
return girls.count()
class Person(graphene.ObjectType):
born = graphene.List(Birthdate)
def resolve_born(self, info):
bds = [Birthdate(row.asDict()["birthdate"])
for row in df.select("birthdate").collect()]
return bds
class Query(graphene.ObjectType):
people = graphene.Field(Person)
def resolve_people(self, info):
return Person()
schema = graphene.Schema(query=Query)
app.add_url_rule(
"/graphql", view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True))
if __name__ == "__main__":
app.run(debug=True)
people.csv
birthdate;gender
13-01-1987;1
02-09-1986;2
13-01-1987;1
02-09-1986;2
12-04-1998;1
Проблема заключается в resolve_boys
и resolve_girls
методов класса Birthdate
, где я ожидаю получить 2 для мальчиков и 1 для девочек, как мы можем видеть непосредственно из оболочки pyspark:
>>> boys = df.where(df.gender == 1).groupby("birthdate").count()
>>> boys.show()
+----------+-----+
| birthdate|count|
+----------+-----+
|13-01-1987| 2|
|12-04-1998| 1|
+----------+-----+
>>> girls = df.where(df.gender == 2).groupby("birthdate").count()
>>> girls.show()
+----------+-----+
| birthdate|count|
+----------+-----+
|02-09-1986| 2|
+----------+-----+
Но вместо этого я простополучить 0 от API:
Так как решить эту проблему?
Если я изменил код, где он читаетCSV-файл для создания локального кадра данных, как это
# df = spark.read.format("csv")\
# .option("sep", ";")\
# .option("header", "true")\
# .load("./people.csv")\
# .cache()
df = spark.createDataFrame(
[{'birthdate': "13-01-1987", "gender": 1},
{'birthdate': "02-09-1986", "gender": 2},
{'birthdate': "13-01-1987", "gender": 1},
{'birthdate': "02-09-1986", "gender": 2},
{'birthdate': "12-04-1998", "gender": 1}]
)
я получаю правильный ответ:
Таким образом, проблема возникаетпри чтении файла CSV.Но почему это происходит?