Оптимизация производительности чтения и записи Spark - PullRequest
0 голосов
/ 18 апреля 2020

У меня есть около 12K двоичных файлов, каждый размером 100 МБ и содержит несколько сжатых записей с переменными длины. Я пытаюсь найти наиболее эффективный способ их чтения, распаковки и последующей записи в формате паркета. Кластер, который у меня есть, состоит из 6 узлов с 4 ядрами в каждом.

В настоящий момент с псевдокодом, представленным ниже, чтение всех файлов занимает около 8 часов, а обратная запись в паркет идет очень медленно.

def reader(file_name):
    keyMsgList = []
    with open(file_name, "rb") as f:
        while True:
            header = f.read(12)
            if not header:
                break
            keyBytes = header[0:8]
            msgLenBytes = header[8:12]

            # conver keyBytes & msgLenBytes to int 
            message = f.read(msgLen)
            keyMsgList.append((key, decode(message)))
    return keyMsgList
files = os.listdir("/path/to/binary/files")
rddFiles = sc.parallelize(files, 6000)
df = spark.createDataFrame(rddFiles.flatMap(reader), schema)
df.repartition(6000).write.mode("append").partitionBy("key").parquet("/directory")

Рациональный выбор 6000 здесь sc.parallelize(files, 6000) - создание разделов, каждый размером 200 МБ, т.е. (12k files * 100mb size) / 200MB. Являясь последовательной природой файлового содержимого, которое необходимо для чтения каждого из них побайтно, не уверены, что чтение может быть дополнительно оптимизировано? Точно так же при обратной записи в паркет число в repartition(6000) должно гарантировать, что данные распределены равномерно и все исполнители могут писать параллельно. Однако это оказывается очень медленной операцией.

Одним из решений является увеличение числа исполнителей, что улучшит производительность чтения, но не уверен, улучшит ли это запись?

Ищете Любое предложение о том, как я могу улучшить производительность здесь?

1 Ответ

0 голосов
/ 18 апреля 2020

Предложение 1: не используйте repartition, но coalesce.

См. здесь . Вы определили узкое место операции repartition, это потому, что вы запустили полную перемешивание. С coalesce вы этого не сделаете. Вы также получите N разделов. Они не будут такими же сбалансированными, как те, которые вы получили бы с repartition, но имеет ли это значение?

Я бы порекомендовал вам отдать предпочтение coalesce, а не repartition

Предложение 2: 6000 разделов может быть не оптимальным

Ваше приложение работает с 6 узлами с 4 ядрами. У вас есть 6000 разделов. Это означает, что у вас есть около 250 разделов по ядру (даже не считая того, что дано вашему мастеру). Это, на мой взгляд, слишком много.

Поскольку ваши разделы малы (около 200 МБ), ваш мастер, вероятно, тратит больше времени на ожидание ответа от исполнителя, чем на выполнение запросов.

Я бы порекомендовал вам уменьшить количество разделов

Предложение 3: можете ли вы использовать API DataFrame?

Операции API DataFrame, как правило, быстрее и лучше, чем решение с ручным кодированием.

Может быть, посмотрите на pyspark.sql.functions, чтобы увидеть, можете ли вы там что-то найти (см. здесь ). Я не знаю, является ли это полезным, так как я не видел ваши данные, но это общая рекомендация, которую я делаю из своего опыта.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...