Pyspark - усреднение значений для каждого пакета данных потоков Kafka - PullRequest
0 голосов
/ 07 декабря 2018

Я использую Pyspark и Kafka для обработки данных через прямые потоки

Я создал функцию, которая читает поток Кафки по пакетам и вычисляет средние значения данных для каждой партии.

Я хочу того же, но когда значения для второй партии должны быть средними значениями для первой и второй партии (вся история ) Я имею в виду).Для третьей партии среднее значение должно быть средним для первой + второй + третьей партии и т. Д.

. Более того, если первая партия может быть обновлена ​​, новые значения рассчитаны сзначения последней партии, это было бы здорово:)

Это то, что я сделал до сих пор:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import json, time, os.path


kafka_brokers = "localhost:9092"
kafka_core_topic = "test"


sc = SparkContext(appName = "test-kafka")
sc.setLogLevel("ERROR")

ssc = StreamingContext(sc, 3)
kvs = KafkaUtils.createDirectStream(ssc, [kafka_core_topic], {"metadata.broker.list": kafka_brokers})

parsed = kvs.map(lambda x: json.loads(x[1]))

@pandas_udf('double')
def mean_score(col):
    return pd.Series([np.mean(col)] * len(col))

def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
    globals()["sparkSessionSingletonInstance"] = SparkSession \
        .builder \
        .config(conf = sparkConf)\
        .getOrCreate()
return globals()["sparkSessionSingletonInstance"]

def process(time, rdd):
    print("========= %s =========" % str(time))
    parquetfile = "sparkstream.parquet"

    spark = getSparkSessionInstance(rdd.context.getConf())

    schema = StructType([
        StructField('name', StringType()),
        StructField('score', IntegerType())
    ])

    data = spark.read.json(rdd, schema = schema)

    data = data.withColumn('mean_score', mean_score(data['score']))

    data.show()

    if os.path.isdir(parquetfile):
        data.write.mode('append').parquet(parquetfile)
    else:
        data.write.parquet(parquetfile)


parsed.foreachRDD(process)

ssc.start()
ssc.awaitTermination()

Это дает следующий результат:

enter image description here

Большое спасибо за помощь :)

...