Мне удалось решить проблему, но это далеко не элегантно. Если кто-то может предложить решение, которое не записывается на диск, я буду очень признателен, и выберу ваш ответ как правильный.
Мне удалось сериализовать DataFrame, используя df.rdd.saveAsPickleFile()
, сжать полученный каталог, передать его клиенту python, записать полученный ZIP-файл на диск, распаковать его, затем использовать SparkContext().pickleFile
перед окончательной загрузкой DataFrame. Я думаю, что это далеко от идеала.
API:
import shutil
import tempfile
from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse
router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)
df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/my-file.parquet')
@router.get("/applications")
def applications():
temp_parquet = tempfile.NamedTemporaryFile()
temp_parquet.close()
df.rdd.saveAsPickleFile(temp_parquet.name)
shutil.make_archive('test', 'zip', temp_parquet.name)
return FileResponse('test.zip')
Клиент:
import io
import zipfile
import requests
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
response = requests.get("http://0.0.0.0:5000/applications")
file_like_object = io.BytesIO(response.content)
with zipfile.ZipFile(file_like_object) as z:
z.extractall('temp.data')
rdd = sc.pickleFile("temp.data")
df = spark.createDataFrame(rdd)
print(df.head())