разница потоков между столбцами A и B, агрегированная по столбцам C и D - PullRequest
0 голосов
/ 29 февраля 2020

Как мне передать в таблицу следующее:

разница между столбцами A и B, агрегированными по столбцам C и D.

+-------------+-------------------+--+-
| Column_A|Column_B |Column_C|Column_D|
+-------------+-------------------+--+-
|52       |67       |boy     |car     |
|44       |25       |girl    |bike    |
|98       |85       |boy     |car     |
|52       |41       |girl    |car     |
+-------------+-------------------+--+-

Это моя попытка, но это не работает:

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")
differenceStream = difference.writeStream\
  .queryName("diff_aggr")\
  .format("memory").outputMode("append")\
  .start()

Я получаю эту ошибку: объект «GroupedData» не имеет атрибута «writeStream»

1 Ответ

0 голосов
/ 29 февраля 2020

В зависимости от того, как вы хотите агрегировать сгруппированные данные - вы можете сделать, например,

Предпосылки (если вы их еще не установили):

from pyspark.sql import functions as F 
from pyspark.sql.functions import *

Для sum :

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))

Для max:

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))

А затем:

differenceStream = difference.writeStream\
  .queryName("diff_aggr")\
  .format("memory").outputMode("append")\
  .start()

Дело в том - если вы делаете groupBy, вам также необходимо уменьшить путем агрегирования. Если вы хотите отсортировать значения вместе, попробуйте df.sort(...)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...