Оптимизация разделов Spark DataFrame - PullRequest
0 голосов
/ 30 апреля 2019

У меня есть дополнительные вопросы после этого поста: https://stackoverflow.com/a/39398750/5060792

У меня есть кластер из 4 узлов (1 драйвер, 3 рабочих), где каждый рабочий имеет 16 ядер и 62 ГБ оперативной памяти, а драйвер имеет 8ядер и 12 Гбайт оперативной памяти.

Таким образом, следуя «практическому правилу» разбиения, разделы должны быть (количество рабочих узлов * число исполнителей на рабочий узел * ядер на исполнителя) * 3 или 4. С динамическим распределениемЯ не совсем уверен, сколько исполнителей запущено на каждом узле, но предполагая, что 3 исполнителя на рабочий узел с 5 ядрами на исполнителя, что будет: 3 * 3 * 5 * 4 = 180. Таким образом, 180 разделов должны быть близки к оптимальным?

Учитывая приведенный ниже воспроизводимый код (где df - это 125 000 строк данных с колонкой String 'text').При динамическом размещении spark помещает импортированный фрейм данных в один раздел моего кластера.

count() из df занимает от 8 до 10 секунд до .repartition(180) и от 1 до 2 секунд после.Принимая во внимание, что функция .rdd addArrays занимает около 8-10 секунд до .repartition(180) и 150 секунд после.
Примечание: Я застрял с использованием spark 2.2.0, поэтому функции массива spark sql функционируютнедоступны для меня.

Запуск .repartition(1) впоследствии не ускоряется addArrays, это продолжается около 2,5 минут.Тем не менее, воссоздание ngrams df с нуля снова, когда spark помещает все в один раздел, ускоряет его до нескольких секунд.

Короче: count() становится быстрее, .rdd.map() становится медленнее.

Я могу повторить эти сценарии много раз.Перераспределение до того, как я применю какую-либо функцию или после, не меняет времени на сколько-нибудь заметную величину.

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline

spark = (
    SparkSession.builder.master('yarn').appName('test')
    .config('spark.kryoserializer.buffer.max', '1g')
    .config('spark.sql.cbo.enabled', True)
    .config('spark.sql.cbo.joinReorder.enabled', True)
    .config('spark.yarn.executor.memoryOverhead', '2g')
    .config('spark.driver.maxResultSize', '2g')
    .config("spark.port.maxRetries", 100)
    .config('spark.dynamicAllocation.enabled', 'true')
    .config('spark.dynamicAllocation.executorIdleTimeout', '60')
    .config('spark.dynamicAllocation.maxExecutors', '56')
    .config('spark.dynamicAllocation.minExecutors', '0')
    .config('spark.dynamicAllocation.schedulerBacklogTimeout', '1')
    .getOrCreate()
)

sc = spark.sparkContext

sc.defaultParallelism
## my defaultParallelism is 2

placeholder = (
    r"Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
    r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
    r"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris "
    r"nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in "
    r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
    r"pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
    r"culpa qui officia deserunt mollit anim id est laborum."
)

df = (
    spark.range(0, 250000, 1)
    .withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
    .withColumn('text', F.lit(placeholder))
    .withColumn('text', F.expr("substring_index(text, ' ', rand1)" ))
    .withColumn('text', F.split(F.col('text'), ' '))
    .select('text')
)

## Saving and reloading puts into 1 partition on my cluster.
df.write.parquet("df.parquet", mode='overwrite')
df = spark.read.parquet("df.parquet")

!hdfs dfs -du -h
## 1.4 M    4.3 M    df.parquet

ngram01 = NGram(n=1, inputCol="text", outputCol="ngrams01")
ngram02 = NGram(n=2, inputCol="text", outputCol="ngrams02")
ngram03 = NGram(n=3, inputCol="text", outputCol="ngrams03")
ngram04 = NGram(n=4, inputCol="text", outputCol="ngrams04")
ngram05 = NGram(n=5, inputCol="text", outputCol="ngrams05")

ngram_pipeline = (
    Pipeline()
    .setStages([ngram01, ngram02, ngram03, ngram04, ngram05])
)

ngrams = (
    ngram_pipeline
    .fit(df)
    .transform(df)
)

'''RDD Function to combine single-ngram Arrays.'''
colsNotNGrams = [c for c in ngrams.columns if 'ngrams' not in c]
colsNotNGramsTpls = ['(row.{},)'.format(c) for c in ngrams.columns if 'ngrams' not in c]
rddColTupls = ' + '.join(colsNotNGramsTpls)

def addArrays(row):
    return (
        eval( rddColTupls )
        + (row.ngrams01 + row.ngrams02 + row.ngrams03,) 
        + (row.ngrams01 + row.ngrams02 + row.ngrams03 + row.ngrams04 + row.ngrams05,)
    ) 


''' timings before repartitioning '''
ngrams.rdd.getNumPartitions()
# output is 1

ngrams.count()
# takes 8 to 10 seconds

ngrams2 = (
    ngrams
    .rdd.map(addArrays)
    .toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## takes 8 to 10 seconds

''' timings after repartitioning '''
ngrams = ngrams.repartition(180)
ngrams.rdd.getNumPartitions()
# output is 180

ngrams2 = (
    ngrams
    .rdd.map(addArrays)
    .toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## now takes 2.5 minutes 

## HOWEVER,
ngrams.count()
# now takes 1 to 2 seconds

''' timings after repartitioning again does not help '''
ngrams = ngrams.repartition(1)
ngrams.rdd.getNumPartitions()
# output is 1

ngrams2 = (
    ngrams
    .rdd.map(addArrays)
    .toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## still takes 2.5 minutes 
...