Можно ли разрешить пользователям загружать результат фрейма данных pyspark в FastAPI или Flask - PullRequest
0 голосов
/ 15 января 2020

Я работаю над API с использованием FastAPI, к которому пользователи могут обратиться с запросом, чтобы произошло следующее:

  1. Сначала запрос get заберет файл из Google Cloud Storage и загрузить его в pyspark DataFrame
  2. Затем приложение выполнит некоторые преобразования в DataFrame
  3. Наконец, я хочу записать DataFrame на диск пользователя в виде файла паркета.

Я не могу понять, как доставить файл пользователю в формате паркета, по нескольким причинам:

  • df.write.parquet('out/path.parquet') записывает данные в каталог на out/path.parquet что представляет собой проблему, когда я пытаюсь передать его starlette.responses.FileResponse
  • Передача одного файла .parquet, который, как я знаю, существует, starlette.responses.FileResponse, кажется, просто выводит двоичный файл на мою консоль (как показано в моем коде ниже) )
  • Запись DataFrame в поток BytesIO , как в pandas, казалась многообещающей, но я не могу понять, как это сделать, используя любой из методов DataFrame или DataFrame.rdd. ethods.

Возможно ли это даже в FastAPI? Возможно ли это в Flask с использованием send_file () ?

Вот код, который у меня пока есть. Обратите внимание, что я попробовал несколько вещей, таких как закомментированный код, но безрезультатно.

import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/sample-data/my.parquet')

@router.get("/applications")
def applications():
    df.write.parquet("temp.parquet", compression="snappy")
    return FileResponse("part-some-compressed-file.snappy.parquet")
    # with tempfile.TemporaryFile() as f:
    #     f.write(df.rdd.saveAsPickleFile("temp.parquet"))
    #     return FileResponse("test.parquet")

Спасибо!

Редактировать: я пытался использовать ответы и предоставленную информацию здесь , но я не могу заставить его работать.

1 Ответ

0 голосов
/ 16 января 2020

Мне удалось решить проблему, но это далеко не элегантно. Если кто-то может предложить решение, которое не записывается на диск, я буду очень признателен, и выберу ваш ответ как правильный.

Мне удалось сериализовать DataFrame, используя df.rdd.saveAsPickleFile(), сжать полученный каталог, передать его клиенту python, записать полученный ZIP-файл на диск, распаковать его, затем использовать SparkContext().pickleFile перед окончательной загрузкой DataFrame. Я думаю, что это далеко от идеала.

API:

import shutil
import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/my-file.parquet')

@router.get("/applications")
def applications():
    temp_parquet = tempfile.NamedTemporaryFile()
    temp_parquet.close()
    df.rdd.saveAsPickleFile(temp_parquet.name)

    shutil.make_archive('test', 'zip', temp_parquet.name)

    return FileResponse('test.zip')

Клиент:

import io
import zipfile

import requests

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

response = requests.get("http://0.0.0.0:5000/applications")
file_like_object = io.BytesIO(response.content)
with zipfile.ZipFile(file_like_object) as z:
    z.extractall('temp.data')

rdd = sc.pickleFile("temp.data")
df = spark.createDataFrame(rdd)

print(df.head())
...