Как закрепить запись файла в Spark? - PullRequest
0 голосов
/ 12 марта 2019

У меня есть код, который читает два файла orc как два кадра данных и объединяет их в один кадр данных. Мой код, а затем записывает этот фрейм данных в файл. Я пытался узнать время каждого шага, не записывая вывод, это занимает одну минуту, но когда я вставляю код написания, это занимает около 38 минут. Данные 5 ГБ, 100 миллионов строк и 50 столбцов.

Код:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime

import time

# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("All imports were successful.")

df = spark.read.orc(
    's3://****'
)
print("First dataframe read with headers set to True")
df2 = spark.read.orc(
    's3://****'
)
print("Second dataframe read with headers set to True")

# Obtain columns lists
left_cols = df.columns
right_cols = df2.columns

# Prefix each dataframe's field with "left_" or "right_"
df = df.selectExpr([col + ' as left_' + col for col in left_cols])
df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])

# Perform join
# df3 = df.alias('l').join(df2.alias('r'), on='l.left_c_0' == 'r.right_c_0')

# df3 = df.alias('l').join(df2.alias('r'), on='c_0')

df3 = df.join(
    df2,
    df["left_c_0"] == df2["right_c_0"]
)

print("Dataframes have been joined successfully.")
output_file_path = 's3://****'.format(
    datetime.utcnow()
)

df3.write.orc(
    output_file_path
)
# print("Dataframe has been written to csv.")
job.commit()

Просто чтобы прояснить, когда я закомментировал df3.write.orc( output_file_path ), я мог заставить скрипт работать <1 минута. </p>

Я не могу вывести какой-либо метод для более быстрого выполнения задачи записи. Можно ли здесь провести распараллеливание?

Обратите внимание, выходные файлы записаны по частям.

1 Ответ

2 голосов
/ 12 марта 2019

При создании фрейма данных он не читается из s3. Фрейм данных - это информация о том, откуда (и как) читать данные. То же самое применимо, когда вы соединяете фреймы данных, он только что создал новый фрейм данных, который знает, что предыдущие два фрейма данных должны быть обработаны для получения результата.

Наконец, когда вы вызываете df3.write, это когда данные для df1 и df2 считываются и обрабатываются. По сути, вся обработка приложения (чтение данных из s3, переименование столбцов, объединение двух фреймов данных) происходит во время вызова df3.write.
Вот почему сценарий завершается менее чем за 1 секунду, если вы удалите последний оператор df3.write (потому что на самом деле ничего не было сделано).

Реальный вопрос в том, как вы можете сделать это приложение быстрее. Это зависит от множества факторов, таких как размер входных данных, объем памяти и количество процессорных ядер, доступных для работы.

...