Имитировать потоковые данные после чтения из файла в Pyspark - PullRequest
0 голосов
/ 11 октября 2019

Итак, я читаю данные из файла. Что-то вроде

 data = spark.read.format('orc').load('myfilepath')

Теперь я хочу пройтись по каждой строке выше и создать два массива. Один для клиентов, которые купили товары и один для клиентов, которые продали товары. Мне нужно, чтобы это было упорядочено по цене, за которую они купили товар, и по времени, в которое они его купили, и, если они были возвращены, я хочу обновить / удалить его из массива. И для каждого момента времени я хочу видеть инвентарь. Например, допустим, у нас есть список, подобный таблице.

Item Bought Time | Item Price | Item Action        | Transaction Unique ID | Amount
     8.30             50          Bought               1                      2000
     8.31             51           Sold                2                      5000
     8.32             50       Bought Returned         1                      2000 
     8.33             52          Bought               3                      10000
     8.34             49          Bought               4                      3000 

Так что я прочитал выше в качестве кадра данных. Я хотел бы в каждый момент времени хранить массив для купленных и проданных товаров и упорядочивать их по цене и времени, чтобы в любой момент я мог сортировать данные, как указано выше для купленных и проданных предметов.

Что-то вроде

Купленный массив

    Time  Info
    8.30  [{50,2000}]
    8.31  [{50,2000}]
    8.32  []
    8.33  [{52,10000}]         
    8.34  [{49,3000}, {52,10000}]

Проданный массив

    Time  Info
    8.30  []
    8.31  [{51,5000}]
    8.32  [{51,5000}]
    8.33  [{51,5000}]         
    8.34  [{51,5000}]

Естьоколо 5 миллионов строк в день, поэтому он также должен быть производительным. Не могли бы вы дать мне знать, как лучше это сделать?

1 Ответ

0 голосов
/ 11 октября 2019

Вы можете использовать Window из pyspark.sql.window и найти кумулятивную сумму значений, затем позвонить collect_list

from pyspark.sql.window import Window
import pyspark.sql.functions as F

w = Window.partitionBy("Item Bought Time").orderBy("Item Bought Time")

df = df.withColumn("Cumsum_Price", F.sum("Item Price").over(w))
df_g = df.groupby("Item Bought Time").agg(F.collect_list("Cumsum_Price"))

Пожалуйста, укажите точную бизнес-логику, если вам нужна дополнительная помощь по ее включениючто можно сделать используя udf

...