Pyspark принять последнее обновленное значение из столбца - PullRequest
0 голосов
/ 09 ноября 2018

У меня есть следующий фрейм данных:

+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
|  1 | 7000   | 30     |   0  |
|  2 | 0      | 9      |   0  |
|  3 | 23627  | 17     |   1  |
|  4 | 8373   | 23     |   0  |
|  5 | -0.5   | 4      |   1  |
+----+--------+--------+------+

Я хочу выполнить следующие условия-
1. Если значение больше 0, я хочу, чтобы предыдущие строки значение2
2. Если значение равно 0, я хочу среднее значение предыдущей строки и значение следующей строки2
3. Если значение меньше 0, то NULL
Поэтому я написал следующий код -

df = df.withColumn('value2',when(col(value1)>0,lag(col(value2))).when(col(value1)==0,\
                   (lag(col(value2))+lead(col(value2)))/2.0).otherwise(None))

То, что я хочу, это чтобы у меня было обновленное значение, когда я беру значение предыдущей и следующей строк, например, следующее. Он должен идти в порядке их нахождения, сначала для id-1, обновить его, затем для id-2 взять обновленное значение и т. Д.

+----+--------+--------+------+
| id | value1 | value2 | flag |
+----+--------+--------+------+
|  1 | 7000   | null   |   0  |
|  2 | 0      | 8.5    |   0  |
|  3 | 23627  | 8.5    |   1  |
|  4 | 8373   | 8.5    |   0  |
|  5 | -0.5   | null   |   1  |
+----+--------+--------+------+

Я попытался просто указать id == 1, когда, переназначить фрейм данных, а затем снова выполнить с колонкой, когда выполняются операции.

df = df.withColumn('value2',when((col(id)==1)&(col(value1)>0,lag(col(value2)))
\.when((col(id)==1)&col(value1)==0,(lag(col(value2))+lead(col(value2)))/2.0)\
.when((col(id)==1)&col(col(value1)<0,None).otherwise(col(value2))

После этого я получу обновленное значение столбца, и если я снова выполню ту же операцию для id == 2, я смогу получить обновленное значение. Но я конечно не могу сделать это для каждого идентификатора. Как мне этого добиться?

Ответы [ 2 ]

0 голосов
/ 11 июля 2019

Я думаю, будет сложно сделать это полностью без зацикливания. Но вы можете разделить работу между разными исполнителями и подмножествами в пандах с помощью udf. Чтобы это работало, должно быть достаточно точек останова (то есть точек данных, где значение меньше 0, и вы вставляете NULL).

Импорт:

from pyspark.sql import Window
from pyspark.sql.functions import last
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, lit, when

Входные данные:

df = spark.createDataFrame([[ 1, 7000.0, 30.0 ], [ 2, 0.0, 9.0], [3, 23628.0, 17.0], [4, 8373.0, 23.0], [5, -0.5, 4.0]], [ 'id', 'value1', 'value2' ]).cache()

Добавление следующего значения2 и установка точек останова, когда значение меньше 0:

dfwithnextvalue = df.alias("a").join(df.alias("b"), col("a.id") == col("b.id") - lit(1), 'left').select("a.*", col("b.value2").alias("nextvalue"))
dfstartnew = dfwithnextvalue.withColumn("startnew", when(col("value1") < lit(0), col("id")).otherwise(lit(None)))\
.withColumn("startnew", when(col("id") == lit(1), lit(1)).otherwise(col("startnew")))
window = Window.orderBy('id')
rolled = last(col('startnew'), ignorenulls=True).over(window)
dfstartnewrolled = dfstartnew.withColumn("startnew", rolled)

Теперь мы можем группировать по столбцу startnew и обрабатывать каждую фигуру в пандах. Мои знания панд не велики, но, похоже, это работает:

@pandas_udf("id long, value1 double, value2 double", PandasUDFType.GROUPED_MAP)
def loopdata(df):
  df = df.set_index('id').sort_index()
  for i in range(0, len(df.index)):
    if i == 0:
      df.loc[df.index[0], 'value2'] = np.nan
    elif df.loc[df.index[i], 'value1'] < 0:
      df.loc[df.index[i], 'value2'] = np.nan
    elif df.loc[df.index[i], 'value1'] > 0:
      df.loc[df.index[i], 'value2'] = df.loc[df.index[i-1], 'value2']
    else:
      nextvalue = df.loc[df.index[i], 'nextvalue']
      if pd.isna(nextvalue):
        nextvalue = 0
      prevvalue = df.loc[df.index[i-1], 'value2']
      if pd.isna(prevvalue):
        prevvalue = 0
      df.loc[df.index[i], 'value2'] = (nextvalue + prevvalue)/2.0
  df = df.drop(columns=['nextvalue', 'startnew'])
  df = df.reset_index()
  return df

Теперь вы можете вычислить результат:

dfstartnewrolled.groupBy("startnew").apply(loopdata)
0 голосов
/ 10 ноября 2018
from pyspark.sql import SparkSession    
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


spark = SparkSession \
    .builder \
    .appName('test') \
    .getOrCreate()


tab_data = spark.sparkContext.parallelize(tab_inp)
##
schema = StructType([StructField('id',IntegerType(),True),
                     StructField('value1',FloatType(),True),
                     StructField('value2',IntegerType(),True),
                     StructField('flag',IntegerType(),True)
                    ])

table = spark.createDataFrame(tab_data,schema)
table.createOrReplaceTempView("table")
dummy_df=table.withColumn('dummy',lit('dummy'))
pre_value=dummy_df.withColumn('pre_value',lag(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

cmb_value=pre_value.withColumn('next_value',lead(dummy_df['value2']).over(Window.partitionBy('dummy').orderBy('dummy')))

new_column=when(col('value1')>0,cmb_value.pre_value) \
            .when(col('value1')<0,cmb_value.next_value)\
            .otherwise((cmb_value.pre_value+cmb_value.next_value)/2)


final_table=cmb_value.withColumn('value',new_column)

Над "final_table" будет поле, которое вы ожидаете.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...