Чтение данных из HDFS, агрегирование и отправка обратно в панды.В приведенном ниже примере используется inferSchema для получения имен и типов столбцов на основе данных, но вы можете предоставить собственную схему, если у вашего файла нет заголовков или вам не нравятся типы, которые он выводит.InferSchema требует дополнительной передачи данных, поэтому в зависимости от размера данных вы можете указать собственную схему независимо от:
from pyspark.sql import functions as f
df = spark.read.csv('/hdfs/path/to/text1.txt', header=1, inferSchema=True, sep=';')
df = df.groupBy('IMSI','WEBSITE').agg(f.min('DATE').alias('min of date'),
f.max('DATE').alias('max of date'),
f.count('DATE').alias('count of date'),
f.sum('LINKUP').alias('sum of linkup'),
f.sum('LINKDOWN').alias('sum of linkdown'),
f.count('COUNT').alias('count of count'),
f.sum('CONNECTION').alias('sum of connection'))
pandasDF = df.toPandas()
В качестве альтернативы, если файл для панд еще слишком велик, вы можете сохранить его в csv с помощью spark.Обратите внимание, что у вас нет контроля над именем выходного файла - вы только указываете местоположение каталога, который будет создан, и сохраняете выходные данные, а имя файла будет соответствовать соглашению об иске для именования временного файла:
df.coalesce(1).write.csv('/hdfs/path/to/output/directory', header=True)
coalesce (1) есть ли получить один файл в качестве вывода, так как спарк создаст количество файлов, равное разбиению (по умолчанию 200 iirc).Чтобы это работало, неразмеченный файл должен уместиться в памяти одного работника.Это все еще слишком большой, не используйте коалесцию.Spark сохранит его в нескольких файлах, и затем вы можете использовать HDFS getmerge , чтобы объединить файлы после слов.