Цикл в кадрах данных Spark с использованием Python - PullRequest
0 голосов
/ 18 июня 2019

Я хочу перебрать искровой фрейм данных, проверить, является ли условие, т. Е. Агрегированное значение нескольких строк, истинным / ложным, а затем создать фрейм данных.Пожалуйста, посмотрите схему кода, не могли бы вы помочь исправить код?Я довольно новичок в Spark, и борьба с Python может пройти через это, любая помощь очень ценится

сортировка сделок по инструменту и дате (в порядке возрастания)

dfsorted = df.orderBy('Instrument','Date').show()

новая временная переменнаячтобы отслеживать сумму суммы

sumofquantity = 0 

для каждой строки в dfsorted

sumofquantity = sumofquantity + dfsorted['Quantity']

продолжайте добавлять строки, зацикленные до сих пор в этом новом кадре данных с именем dftemp

dftemp= dfsorted (how to write this?)



if ( sumofquantity == 0)

как только сумма складывается равной нулю, для всех строк в tempview добавьте новый столбец с уникальным последовательным номером

и добавьте строки в окончательный кадр данных

dffinal= dftemp.withColumn('trade#', assign a unique trade number)

сбросьте сумму суммывернуться к 0

sumofquantity = 0

очистить dftemp-как очистить фрейм данных, чтобы я мог начать с нуля строк для следующей итерации?

trade_sample.csv (необработанный входной файл)

Customer ID,Instrument,Action,Date,Price,Quantity 
U16,ADM6,BUY,20160516,0.7337,2
U16,ADM6,SELL,20160516,0.7337,-1
U16,ADM6,SELL,20160516,0.9439,-1
U16,CLM6,BUY,20160516,48.09,1
U16,CLM6,SELL,20160517,48.08,-1
U16,ZSM6,BUY,20160517,48.09,1
U16,ZSM6,SELL,20160518,48.08,-1

Ожидаемый результат (обратите внимание на последний новый столбец - это все, что я пытаюсь добавить)

Customer ID,Instrument,Action,Date,Price,Quantity,trade#
U16,ADM6,BUY,20160516,0.7337,2,10001
U16,ADM6,SELL,20160516,0.7337,-1,10001 
U16,ADM6,SELL,20160516,0.9439,-1,10001 
U16,CLM6,BUY,20160516,48.09,1,10002 
U16,CLM6,SELL,20160517,48.08,-1,10002 
U16,ZSM6,BUY,20160517,48.09,1,10003 
U16,ZSM6,SELL,20160518,48.08,-1,10003

1 Ответ

1 голос
/ 18 июня 2019

Зацикливание таким образом не является хорошей практикой. Вы не можете добавлять / суммировать фрейм данных кумулятивно и очищать неизменяемый фрейм данных. Для решения вашей проблемы вы можете использовать концепцию оконного спаркера. Насколько я понимаю вашу проблему, вы хотите рассчитать сумму Quantity для каждого customer ID. После завершения сумма для одного идентификатора клиента вы сбрасываете sumofquantity на ноль. Если это так, то вы можете разделить Customer ID с порядком на Instrument, Date и рассчитать сумму для каждого Customer ID. Как только вы получите сумму, вы можете проверить trade# с вашими условиями.

просто см. Ниже код:

    >>> from pyspark.sql.window import Window
    >>> from pyspark.sql.functions import row_number,col,sum
    >>> w = Window.partitionBy("Customer ID").orderBy("Instrument","Date")
    >>> w1 = Window.partitionBy("Customer ID").orderBy("Instrument","Date","rn")
    >>> dftemp =  Df.withColumn("rn", (row_number().over(w))).withColumn("sumofquantity", sum("Quantity").over(w1)).select("Customer_ID","Instrument","Action","Date","Price","Quantity","sumofquantity")
    >>> dftemp.show()
+-----------+----------+------+--------+------+--------+-------------+
|Customer_ID|Instrument|Action|    Date| Price|Quantity|sumofquantity|
+-----------+----------+------+--------+------+--------+-------------+
|        U16|      ADM6|   BUY|20160516|0.7337|       2|            2|
|        U16|      ADM6|  SELL|20160516|0.7337|      -1|            1|
|        U16|      ADM6|  SELL|20160516|0.9439|      -1|            0|
|        U16|      CLM6|   BUY|20160516| 48.09|       1|            1|
|        U16|      CLM6|  SELL|20160517| 48.08|      -1|            0|
|        U16|      ZSM6|   BUY|20160517| 48.09|       1|            1|
|        U16|      ZSM6|  SELL|20160518| 48.08|      -1|            0|
+-----------+----------+------+--------+------+--------+-------------+

Вы можете обратиться к оконной функции по ссылке ниже:

https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...