Я работаю над набором данных CSV и обрабатываю их с помощью потоковой передачи. Я могу применить пакетную обработку, используя оконную функцию в потоковой передаче искры. Есть ли способ, которым я могу сделать то же самое с использованием потоковой структурированной искры без использования функции агрегирования? Все примеры, доступные в Интернете, используют параметр groupBy. Я просто хочу разделить данные на пакеты без агрегирования с использованием структурированной потоковой передачи.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import *
def foreach_batch_function(df, epoch_id):
#df = df.select(split('value',','))
#df.show()
print(type(df))
df = df.toPandas()
df = df.value.str.split("," ,expand=True)
df.show()
spark = SparkSession.builder.appName("TurbineDataAnalytics").getOrCreate()
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 8887).load()
lines = lines.groupBy(window(lines.value, "10 minutes", "5 minutes"), lines.value).count()
query = lines.writeStream.foreachBatch(foreach_batch_function).start()
query.awaitTermination()
Пример данных:
Date_Time,Rt_avg,Q_avg,Rs_avg,Rm_avg,Ws_avg,Nu_avg
12/31/16 18:00,12,12.18,9.3500004,742.70001,4.5599999,700.33002
12/31/16 18:10,12,11.35,9.4799995,788.98999,4.9899998,698.03998
12/31/16 18:20,12,11.05,9.2399998,654.10999,4.8400002,700.16998
12/31/16 18:30,12,12,9.5,795.71997,4.6999998,699.37