Как собрать и реструктурировать данные в фрейме pyspark (по столбцам) - PullRequest
0 голосов
/ 05 июня 2019

Я пытаюсь объединить данные в фрейме данных pyspark по определенным критериям.Я пытаюсь выровнять действие на основе количества switchOUT, чтобы переключить количество.Таким образом, аккаунты с переключением денег становятся со счета, а другие аккаунты становятся to_accounts.

Данные, которые я получаю в кадре данных, начинаются с

+--------+------+-----------+----------+----------+-----------+ 
| person | acct | close_amt | open_amt | switchIN | switchOUT | 
+--------+------+-----------+----------+----------+-----------+ 
| A      | 1    |       125 | 50       | 75       | 0         | 
+--------+------+-----------+----------+----------+-----------+ 
| A      | 2    |       100 | 75       | 25       | 0         | 
+--------+------+-----------+----------+----------+-----------+ 
| A      | 3    |       200 | 300      | 0        | 100       | 
+--------+------+-----------+----------+----------+-----------+ 

К этой таблице

+--------+--------+-----------+----------+----------+
| person | from_acct| to_acct | switchIN | switchOUT| 
+--------+----------+--------+----------+-----------+ 
| A      | 3        |      1 | 75       | 100       |
+--------+----------+--------+----------+-----------+
| A      | 3        |      2 | 25       | 100       | 
+--------+----------+--------+----------+-----------+ 

Итакже как я могу сделать это так, чтобы он работал для N количества строк (не только для 3 учетных записей)

До сих пор я использовал этот код


# define udf
def sorter(l):
  res = sorted(l, key=operator.itemgetter(1))
  return [item[0] for item in res]

def list_to_string(l):
  res = 'from_fund_' +str(l[0]) + '_to_fund_'+str(l[1])
  return res

def listfirstAcc(l):
    res = str(l[0])
    return res

def listSecAcc(l):
    res = str(l[1])
    return res


sort_udf = F.udf(sorter)
list_str = F.udf(list_to_string)
extractFirstFund = F.udf(listfirstAcc)
extractSecondFund = F.udf(listSecAcc)


# Add additional columns
df= df.withColumn("move", sort_udf("list_col").alias("sorted_list"))
df= df.withColumn("move_string", list_str("move"))
df= df.withColumn("From_Acct",extractFirstFund("move"))
df= df.withColumn("To_Acct",extractSecondFund("move"))

Текущий результат, который я получаю:

+--------+--------+-----------+----------+----------+
| person | from_acct| to_acct | switchIN | switchOUT| 
+--------+----------+--------+----------+-----------+ 
| A      | 3        |    1,2 | 75       | 100       |
+--------+----------+--------+----------+-----------+
...