функция искрового окна условный перезапуск - PullRequest
0 голосов
/ 24 мая 2018

Я пытаюсь вычислить значение действия, которое не было получено из дополнительного зачисления.

Ввод:

+------+--------+------+
|period|activity|credit|
+------+--------+------+
|     1|       5|     0|
|     2|       0|     3|
|     3|       4|     0|
|     4|       0|     3|
|     5|       1|     0|
|     6|       1|     0|
|     7|       5|     0|
|     8|       0|     1|
|     9|       0|     1|
|    10|       5|     0|
+------+--------+------+

Выход:

rdd = sc.parallelize([(5,0,5),(0,3,0),(4,0,1),(0,3,0),(1,0,0),(1,0,0),(5,0,4),(0,1,0),(0,1,0),(5,0,3)])
df = rdd.toDF(["activity","credit","realActivity"])

+------+--------+------+------------+
|period|activity|credit|realActivity|
+------+--------+------+------------+
|     1|       5|     0|           5|
|     2|       0|     3|           0|
|     3|       4|     0|           1|
|     4|       0|     3|           0|
|     5|       1|     0|           0|
|     6|       1|     0|           0|
|     7|       5|     0|           4|
|     8|       0|     1|           0|
|     9|       0|     1|           0|
|    10|       5|     0|           3|
+------+--------+------+------------+

Я пытался создатьстолбец кредитного баланса, который добавляет и вычитает в зависимости от типа строки, но я не могу перезапустить его условно (каждый раз, когда он опускается ниже нуля) в зависимости от самого себя.Это похоже на рекурсивную проблему, которую я не уверен, как превратить в дружественный pyspark.Очевидно, что я не могу сделать следующее, самостоятельно ссылаясь на предыдущее значение ..

w = Window.orderBy("period")

df = df.withColumn("realActivity", lag("realActivity",1,0).over(w) - lag("credit", 1, 0).over(w) - lag("activity",1,0).over(w) )

Обновление: Как было отмечено, это невозможно при расчете окна.Поэтому я хотел бы сделать что-то вроде приведенного ниже фрагмента для расчета creditBalance, который позволил бы мне вычислить realActivity.

df['creditBalance']=0     
for i in range(1, len(df)):
    if (df.loc[i-1, 'creditBalance']) > 0:
        df.loc[i, 'creditBalance'] = df.loc[i-1, 'creditBalance'] + df.loc[i, 'credit'] - df.loc[i, 'activity']
    elif df.loc[i, 'creditamount'] > 0:
        df.loc[i, 'creditBalance'] = df.loc[i, 'credit'] - df.loc[i, 'activity']

Теперь мой единственный вопрос: как я могу применить эту "локальную" функцию к каждой группе в кадре данных spark?

  • записать кадр данных в файлы по группам и процессамлокально?
  • настраивать карту и собирать строки для локального выполнения?
  • сворачивать строки в одну строку по группе и обрабатывать это?
  • что-нибудь еще?

1 Ответ

0 голосов
/ 30 мая 2018

@ pansen, я решил проблему с помощью следующего кода.Это может быть полезно, если вы пытаетесь решить подобную проблему.

def creditUsage(rows):
    '''
    Input:
    timestamp, activity, credit
    ['1;5;0', '2;0;3', '3;4;0', '4;0;3', '5;1;0', '6;1;0', '7;5;0', '8;0;1', '9;0;1', '10;5;0']

    Output:
    [timestamp; creditUsage]
    '''
    timestamps = [int(r.split(";")[0]) for r in rows]
    rows = [r for _,r in sorted(zip(timestamps,rows))]

    print(rows)
    timestamp, trActivity, credit = zip(*[(int(ts), float(act), float(rbonus)) for r in rows for [ts, act, rbonus] in [r.split(";")]])
    creditBalance,creditUsage = [0.0] * len(credit), [0.0] * len(credit)

    for i in range(0, len(trActivity)):
        creditBalance[i] = creditBalance[i-1]+credit[i]
        """ if bonusBalance greater than activity then actitivity is the usage, if not, than bonusBalance """
        creditUsage[i] =  creditBalance[i] if creditBalance[i] - trActivity[i] <0 else trActivity[i]
        creditBalance[i] += (- creditUsage[i])

    output = ["{0};{1:02}".format(t_, r_) for t_, r_ in zip(timestamp, creditUsage)]
    return(output)

realBonusUDF = udf(creditUsage,ArrayType(StringType()))

a= df.withColumn('data', concat_ws(';', col('period'), col('activity'), col('credit'))) \
  .groupBy('userID').agg(collect_list('data').alias('data')) \
  .withColumn('data', realBonusUDF('data')) \
  .withColumn("data", explode("data")) \
  .withColumn("data", split("data", ";")) \
  .withColumn("timestamp", col('data')[0].cast("int")) \
  .withColumn("creditUsage", col('data')[1].cast("float")) \
  .drop('data')

Вывод:

+------+---------+-----------+
|userID|timestamp|creditUsage|
+------+---------+-----------+
|   123|        1|        0.0|
|   123|        2|        0.0|
|   123|        3|        3.0|
|   123|        4|        0.0|
|   123|        5|        1.0|
|   123|        6|        1.0|
|   123|        7|        1.0|
|   123|        8|        0.0|
|   123|        9|        0.0|
|   123|       10|        2.0|
+------+---------+-----------+
...