Я использую Pyspark и Kafka для обработки данных через прямые потоки
Я создал функцию, которая читает поток Кафки по пакетам и вычисляет средние значения данных для каждой партии.
Я хочу того же, но когда значения для второй партии должны быть средними значениями для первой и второй партии (вся история ) Я имею в виду).Для третьей партии среднее значение должно быть средним для первой + второй + третьей партии и т. Д.
. Более того, если первая партия может быть обновлена , новые значения рассчитаны сзначения последней партии, это было бы здорово:)
Это то, что я сделал до сих пор:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import json, time, os.path
kafka_brokers = "localhost:9092"
kafka_core_topic = "test"
sc = SparkContext(appName = "test-kafka")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 3)
kvs = KafkaUtils.createDirectStream(ssc, [kafka_core_topic], {"metadata.broker.list": kafka_brokers})
parsed = kvs.map(lambda x: json.loads(x[1]))
@pandas_udf('double')
def mean_score(col):
return pd.Series([np.mean(col)] * len(col))
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf = sparkConf)\
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
def process(time, rdd):
print("========= %s =========" % str(time))
parquetfile = "sparkstream.parquet"
spark = getSparkSessionInstance(rdd.context.getConf())
schema = StructType([
StructField('name', StringType()),
StructField('score', IntegerType())
])
data = spark.read.json(rdd, schema = schema)
data = data.withColumn('mean_score', mean_score(data['score']))
data.show()
if os.path.isdir(parquetfile):
data.write.mode('append').parquet(parquetfile)
else:
data.write.parquet(parquetfile)
parsed.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
Это дает следующий результат:
Большое спасибо за помощь :)