как мне разрешить пандам работать с искровым кластером - PullRequest
0 голосов
/ 30 января 2019

главная проблема в pandas: он не может обрабатывать большие данные манипуляции, не хватает памяти для массивного файла CSV, теперь я перешел на pyspark 1.6 в Hadoop, я пробовал с dask.dataframe, но проблема все еще существует, есть липочему панды работают с кластером Hadoop или кластером pyspark, я хочу использовать эту функцию с pandas

import pandas as pd
df = pd.read_csv('text1.txt',names =['DATE','IMSI','WEBSITE','LINKUP','LINKDOWN','COUNT','CONNECTION'])
df.columns.str.strip()
df.DATE = pd.to_datetime(df.DATE)
group = df.groupby(['IMSI','WEBSITE']).agg({'DATE':[min,max,'count']
    ,'LINKUP':'sum'
    , 'LINKDOWN':'sum'
    , 'COUNT':'max'
    ,'CONNECTION':'sum'
            })
group.to_csv('finalinfo.txt', index = True, header = False)

1 Ответ

0 голосов
/ 03 февраля 2019

Чтение данных из HDFS, агрегирование и отправка обратно в панды.В приведенном ниже примере используется inferSchema для получения имен и типов столбцов на основе данных, но вы можете предоставить собственную схему, если у вашего файла нет заголовков или вам не нравятся типы, которые он выводит.InferSchema требует дополнительной передачи данных, поэтому в зависимости от размера данных вы можете указать собственную схему независимо от:

from pyspark.sql import functions as f

df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';') 
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
                                      f.max('DATE').alias('max of date'),
                                      f.count('DATE').alias('count of date'),
                                      f.sum('LINKUP').alias('sum of linkup'),
                                      f.sum('LINKDOWN').alias('sum of linkdown'),
                                      f.count('COUNT').alias('count of count'),
                                      f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()

В качестве альтернативы, если файл для панд еще слишком велик, вы можете сохранить его в csv с помощью spark.Обратите внимание, что у вас нет контроля над именем выходного файла - вы только указываете местоположение каталога, который будет создан, и сохраняете выходные данные, а имя файла будет соответствовать соглашению об иске для именования временного файла:

df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)

coalesce (1) есть ли получить один файл в качестве вывода, так как спарк создаст количество файлов, равное разбиению (по умолчанию 200 iirc).Чтобы это работало, неразмеченный файл должен уместиться в памяти одного работника.Это все еще слишком большой, не используйте коалесцию.Spark сохранит его в нескольких файлах, и затем вы можете использовать HDFS getmerge , чтобы объединить файлы после слов.

...