Spark структурированная потоковая передача - возможно ли использовать функциональность Spark структурированного окна без агрегирования? - PullRequest
0 голосов
/ 09 июня 2019

Я работаю над набором данных CSV и обрабатываю их с помощью потоковой передачи. Я могу применить пакетную обработку, используя оконную функцию в потоковой передаче искры. Есть ли способ, которым я могу сделать то же самое с использованием потоковой структурированной искры без использования функции агрегирования? Все примеры, доступные в Интернете, используют параметр groupBy. Я просто хочу разделить данные на пакеты без агрегирования с использованием структурированной потоковой передачи.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import *

def foreach_batch_function(df, epoch_id):
    #df = df.select(split('value',','))
    #df.show()
    print(type(df))
    df = df.toPandas()
    df = df.value.str.split("," ,expand=True)
    df.show()

spark = SparkSession.builder.appName("TurbineDataAnalytics").getOrCreate()

lines = spark.readStream.format("socket").option("host", "localhost").option("port", 8887).load()

lines = lines.groupBy(window(lines.value, "10 minutes", "5 minutes"), lines.value).count()

query = lines.writeStream.foreachBatch(foreach_batch_function).start()

query.awaitTermination()

Пример данных:

Date_Time,Rt_avg,Q_avg,Rs_avg,Rm_avg,Ws_avg,Nu_avg

12/31/16 18:00,12,12.18,9.3500004,742.70001,4.5599999,700.33002

12/31/16 18:10,12,11.35,9.4799995,788.98999,4.9899998,698.03998

12/31/16 18:20,12,11.05,9.2399998,654.10999,4.8400002,700.16998

12/31/16 18:30,12,12,9.5,795.71997,4.6999998,699.37

1 Ответ

0 голосов
/ 15 июня 2019

В соответствии с тем, что вы упомянули в комментариях, вы хотите знать, как вы можете разделить столбец значений вашего фрейма данных и как вы можете применять скользящие окна без группировки.

Вы можете разделить столбец значений с помощью функции split и применить скользящие окна с помощью выбора.Посмотрите на псевдокод ниже:

import pyspark.sql.functions as F
#readstream
lines = lines.select(lines.value)

split_col = F.split(df.value, ',')

lines = lines.withColumn('Date_Time', split_col.getItem(0))
lines = lines.withColumn('Rt_avg', split_col.getItem(1))
lines = lines.withColumn('Q_avg', split_col.getItem(2))
lines = lines.withColumn('Rs_avg', split_col.getItem(3))
lines = lines.withColumn('Rm_avg', split_col.getItem(4))
lines = lines.withColumn('Ws_avg',split_col.getItem(5))
lines = lines.withColumn('Nu_avg',split_col.getItem(6))

w = lines.select(F.window("Date_Time", "5 seconds"))
#writeStream
...