Spark DataFrame добавляет строку n-раз на основе значения весового столбца - PullRequest
0 голосов
/ 27 апреля 2018

То, что я пытаюсь сделать, - это «перепробовать» небольшой CSV-файл со столбцом значения веса в каждой строке.

Age|City|Weight
20 | NY |2
30 | SF |3

до

 Age|City|
 20 | NY |
 20 | NY |
 30 | SF |
 30 | SF |
 30 | SF |

С пандой и нп я это сделал

df = pd.read_csv('file.csv',low_memory=False)
weights=round(df.weight)
df.loc[np.repeat(df.index.values,weights)]

Но это слишком медленно, он использует 100% 1 ЦП (из 15 доступных) и всю память 65 ГБ в течение более 24 часов и, наконец, дает сбой. Финальный файл должен содержать более 70 миллионов строк.

Так что я попробую с Spark.

rdd.map(lamba x: rdd.udf()) или что-то подобное в сочетании с explode() должно помочь, но я не понимаю, как сделать это правильно. В конце мне нужно сохранить DataFrame или RDD в одном CSV, не разделенном на разделы: CSV, который я могу использовать обратно с Panda.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 30 апреля 2018

Ну, мне удалось заставить его работать благодаря:

with open("file_in.csv",encoding='utf-8') as fr, open("file_out.csv", "w",encoding='utf-8') as fw:
reader = csv.reader(fr)
writer = csv.writer(fw)

for row in reader:
    if row[0] == "firstColName" :
        writer.writerow(row[:-1])  
    else:
        writer.writerows(row[:-1] for _ in range(int(row[-1])))
0 голосов
/ 27 апреля 2018

Ничто не говорит о том, что вам нужен Spark, если вы можете загружать данные в память и планируете читать вывод локально с помощью Pandas. Просто сделай это просто

import csv

with open("input.csv") as fr, open("output.csv", "w") as fw:
    reader = csv.reader(fr)
    writer = csv.writer(fw)
    for age, city, weight in reader:
        if age == "age":
            writer.writerow((age, city))
        else:
            writer.writerows((age, city) for _ in range(int(weight)))

или с большим количеством столбцов (я предполагаю, что вес - это последний столбец, скорректируйте в соответствии с формой реальных данных):

with open("input.csv") as fr, open("output.csv", "w") as fw:
    reader = csv.reader(fr)
    writer = csv.writer(fw)
    for row:
        if row[0] == "age":
            writer.writerow(row[:-1])  
        else:
            writer.writerows(row[:-1] for _ in range(int(row[-1])))
...