Есть ли способ вычесть столбец «A», присутствующий в Stream1, из столбца «B», присутствующий в Stream2? - PullRequest
1 голос
/ 10 апреля 2019

Я читаю 2 потока (stream1 & stream2) из ​​Kafka в потоковой структурированной искре (pyspark). Я должен рассчитать разницу между смещениями stream1 и stream2.

Я пытаюсь что-то вроде этого:

<class 'pyspark.sql.dataframe.DataFrame'>
root
|--timestamp: timestamp (nullable = true)
|-- value: string (nullable = true)
|-- offset: double (nullable = true)
|-- string_val: string (nullable = true)
|-- ping: double (nullable = true)
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- offset_v1: double (nullable = true)
|-- date_time: string (nullable = true)
|-- date_format: timestamp (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>
|-- Mean: double (nullable = true)
|-- pingTime: timestamp (nullable = true)
|-- Std_Deviation: double (nullable = true)
|-- devTime: timestamp (nullable = true)
|-- offset_v2: double (nullable = true)
|-- upperBound: double (nullable = true)
|-- lowerBound: double (nullable = true)

stream2 = stream2.withColumn('difference',stream2.offset_v2-stream1.offset_v1)

Выдает ошибку:

pyspark.sql.utils.AnalysisException: u'Resolved атрибут (ы) offset_v1 # 95 отсутствует в upperBound # 182, Std_Deviation # 149, lowerBound # 189, Mean # 133, pingTime # 129-T30000ms, devTime # 144-T30000ms, offset_v2 # 155 в операторе! Проект [Mean # 133, pingTime # 129-T30000ms, Std_Deviation # 149, devTime # 144-T30000ms, offset_v2 # 155, upperBound # 182, lowerBound # 189, (offset_v2 # 155 - offset_v1 # 95) AS Разница # 233]

1 Ответ

0 голосов
/ 11 апреля 2019

Как сказал Венки, вам нужно сначала присоединиться, чтобы сравнить соответствующие строки вместе.У вас есть колонка для этого?A date ir id сделают свое дело.Предположим, у вас есть один файл с именем join_col в обоих фреймах данных :

from pyspark.sql.functions import *

stream_final = stream1.join(stream2, 'join_col', 'inner')

# Now compute difference by adding a new column 'offset_diff':

stream_final = stream_final.withColumn('offset_diff', stream_final.offset_v1 - stream_final.offset_v2)

Если вы не можете найти правильное соединение, это проблема для случаев, когда высравните столбцы разной длины, с которыми, я полагаю, вы имеете дело.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...