Как вставить объединенные значения из фрейма данных в другой фрейм данных в Pyspark? - PullRequest
1 голос
/ 30 мая 2019

Я создаю столбец time_interval и добавляю его в существующий фрейм данных в Pyspark . В идеале интервал времени должен быть в формате « ЧЧмм », а минуты округляться до ближайшей 15-минутной отметки (815, 830, 845, 900 и т. Д.).

У меня есть код spark sql, который выполняет мою логику, но как мне взять это значение, которое объединяется в виде строкового столбца, и вставить его в существующий фрейм данных?

time_interval = sqlContext.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")

time_interval.show()

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|concat(CAST(hour(current_timestamp()) AS STRING), CAST((FLOOR((CAST(minute(current_timestamp()) AS DOUBLE) / CAST(15 AS DOUBLE))) * CAST(15 AS BIGINT)) AS STRING))|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                               1045|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

baseDF = sqlContext.sql("select * from test_table")
newBase = baseDF.withColumn("time_interval", lit(str(time_interval)))

newBase.select("time_interval").show()

+--------------------+
|       time_interval|
+--------------------+
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
+--------------------+
only showing top 20 rows

Таким образом, фактические ожидаемые результаты должны просто показывать фактическое строковое значение в новом столбце, который я создаю, а не это объединенное значение из фрейма данных. Примерно так:

newBase.select("time_interval").show(1)
+-------------+
|time_interval|
+-------------+
|    1045     |                                                                                                                                           
+-------------+

1 Ответ

0 голосов
/ 30 мая 2019

Поскольку time_interval - это тип данных, для этого случая необходимо collect и extract the required value out from dataframe.

Попробуйте следующим образом:

newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
newBase.show()

(или)

Используя функцию select(expr()):

newBase = baseDF.select("*",expr("string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval"))

Как pault , упомянутое в комментариях, с использованием selectExpr() функции:

newBase = baseDF.selectExpr("*","string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval")

Пример:

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import IntegerType
>>> time_interval = spark.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")
>>> baseDF=spark.createDataFrame([1,2,3,4],IntegerType())
>>> newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
>>> newBase.show()
+-----+-------------+
|value|time_interval|
+-----+-------------+
|    1|         1245|
|    2|         1245|
|    3|         1245|
|    4|         1245|
+-----+-------------+
...