Как преобразовать фрейм данных в искровую структурированную потоковую передачу с использованием Python? - PullRequest
0 голосов
/ 20 мая 2019

Я тестирую структурированную потоковую передачу, используя localhost, с которого он читает поток данных. Ввод потоковых данных с локального узла:

ID   Subject  Marks
--------------------
1    Maths    85  
1    Physics  80  
2    Maths    70  
2    Physics  80  

Я хотел бы получить средние оценки для каждого уникального идентификатора.

Я пробовал это, но не смог преобразовать DF, который является единственным значением.

Ниже мой код:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *  
from pyspark.sql.types import *
spark = SparkSession.builder.appName("SrteamingAge").getOrCreate()

schema = StructType([StructField("ID", IntegerType(), \  
True),StructField("Subject", StringType(), True),StructField("Marks", \
IntegerType(), True)])

marks = spark.readStream.format("socket").option("host", 
"localhost").option("port", 9999).schema(schema).load()
marks.printSchema()
result = marks.groupBy("ID").agg(avg("Marks").alias("Average Marks"))

Но я получаю следующую ошибку:

    root
      |-- value: string (nullable = true)

Pyspark.sql.utils.Analysisexception: "u can not resolve 'ID' given input columns: [value];"

Я создаю схему для того же, но не повезло. Любая помощь будет оценена.

Мой ожидаемый результат - всего 2 столбца (ID и среднее значение)

ID  Average Marks  
1     82.5  
2     75  

1 Ответ

0 голосов
/ 08 июня 2019

В вашем фрейме данных нет столбца с именем ID, но вы пытаетесь сгруппировать его. Вам нужно разделить столбец с именем «значение» следующим образом:

df = marks\
  .withColumn("value", split(col("value"),"\\,"))  \
  .select(
    col("value").getItem(0).cast("int").alias("ID"),
    col("value").getItem(1).alias("Subject"),
    col("value").getItem(2).cast("int").alias("Marks")) \
  .drop("value")

Затем группа по df:

result = df.groupBy("ID").agg(avg("Marks").as("Average Marks"))

Предположение: ввод имеет вид 1,Maths,85 и т. Д.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...