Конвертируйте Spark Структурированный DataFrame в Pandas, используя pandas_udf - PullRequest
0 голосов
/ 20 мая 2019

Мне нужно прочитать CSV-файлы в виде потока, а затем преобразовать это в pandas dataframe.

Вот что я сделал до сих пор


    DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
    StructField("Count", IntegerType(), True), \
    StructField("Reading", FloatType(), True) ])

    group_columns = ['TimeStamp','Count','Reading']

    @pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        return pd.DataFrame([pdf[group_columns]],columns=[group_columns])

    # getting Surge data from the files
    SrgDF = spark \
        .readStream \
        .schema(DataShema) \
        .csv("ProcessdedData/SurgeAcc")

    mydf = SrgDF.groupby(group_columns).apply(get_pdf)

    qrySrg = SrgDF \
        .writeStream \
        .format("console") \
        .start() \
        .awaitTermination()

Я считаю, что из другого источника ( Преобразование потоковых структур данных Spark Structure в Pandas DataFrame ), которые преобразуют структурированный потоковый кадр данных вПанды напрямую не возможны, и кажется, что pandas_udf - правильный подход, но не может понять, как именно этого добиться.Мне нужно, чтобы кадр данных pandas передавался в мои функции.

Редактировать

, когда я запускаю код (скорее изменяя запрос на mydfчем SrgDF), тогда я получаю следующую ошибку: pyspark.sql.utils.StreamingQueryException: 'Writing job aborted.\n=== Streaming Query ===\nIdentifier: [id = 18a15e9e-9762-4464-b6d1-cb2db8d0ac41, runId = e3da131e-00d1-4fed-82fc-65bf377c3f99]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc]: {"logOffset":0}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nFlatMapGroupsInPandas [Count#1], get_pdf(TimeStamp#0L, Count#1, Reading#2), [TimeStamp#10L, Count#11, Reading#12]\n+- Project [Count#1, TimeStamp#0L, Count#1, Reading#2]\n +- StreamingExecutionRelation FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc], [TimeStamp#0L, Count#1, Reading#2]\n' 19/05/20 18:32:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver /usr/local/lib/python3.6/dist-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use ".

EDIT-2

Вот код для воспроизведенияошибка

import sys

from pyspark import SparkContext
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.streaming import StreamingContext

from pyspark.sql.types import *

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyarrow as pa

import glob

#####################################################################################

if __name__ == '__main__' :

    spark = SparkSession \
        .builder \
        .appName("RealTimeIMUAnalysis") \
        .getOrCreate() 

    spark.conf.set("spark.sql.execution.arrow.enabled", "true")

    # reduce verbosity
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    ##############################################################################

    # using the saved files to do the Analysis
    DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
    StructField("Count", IntegerType(), True), \
    StructField("Reading", FloatType(), True) ])

    group_columns = ['TimeStamp','Count','Reading']

    @pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        return pd.DataFrame([pdf[group_columns]],columns=[group_columns])

    # getting Surge data from the files
    SrgDF = spark \
        .readStream \
        .schema(DataShema) \
        .csv("SurgeAcc")

    mydf = SrgDF.groupby('Count').apply(get_pdf)
    #print(mydf)

    qrySrg = mydf \
        .writeStream \
        .format("console") \
        .start() \
        .awaitTermination()

Для запуска необходимо создать папку с именем SurgeAcc, в которой находится код, и создать внутри файл csv в следующем формате:

TimeStamp,Count,Reading
1557011317299,45148,-0.015494
1557011317299,45153,-0.015963
1557011319511,45201,-0.015494
1557011319511,45221,-0.015494
1557011315134,45092,-0.015494
1557011315135,45107,-0.014085
1557011317299,45158,-0.015963
1557011317299,45163,-0.015494
1557011317299,45168,-0.015024`

1 Ответ

0 голосов
/ 20 мая 2019

Ваш возвращаемый кадр данных pandas_udf не соответствует указанной схеме.

Обратите внимание, что входом для pandas_udf будет pandas dataframe, а также он возвращает pandas dataframe.

Вы можете использовать все функции панд внутри pandas_udf. Единственное, что вы должны убедиться, что ReturnDataShema должна совпадать с фактическим выводом функции.

ReturnDataShema = StructType([StructField("TimeStamp", LongType(), True), \
                            StructField("Count", IntegerType(), True), \
                            StructField("Reading", FloatType(), True), \
                            StructField("TotalCount", FloatType(), True)])

@pandas_udf(ReturnDataShema, PandasUDFType.GROUPED_MAP)
    def get_pdf(pdf):
        # This following stmt is causing schema mismatch
        # return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
        # If you want to return all the rows of pandas dataframe
        # you can simply
        # return pdf
        # If you want to do any aggregations, you can do like the below, or use pandas query
        # but make sure the return pandas dataframe complies with ReturnDataShema
        total_count = pdf['Count'].sum()
        return pd.DataFrame([(pdf.TimeStamp[0],pdf.Count[0],pdf.Reading[0],total_count)])
...