У меня есть дополнительные вопросы после этого поста: https://stackoverflow.com/a/39398750/5060792
У меня есть кластер из 4 узлов (1 драйвер, 3 рабочих), где каждый рабочий имеет 16 ядер и 62 ГБ оперативной памяти, а драйвер имеет 8ядер и 12 Гбайт оперативной памяти.
Таким образом, следуя «практическому правилу» разбиения, разделы должны быть (количество рабочих узлов * число исполнителей на рабочий узел * ядер на исполнителя) * 3 или 4. С динамическим распределениемЯ не совсем уверен, сколько исполнителей запущено на каждом узле, но предполагая, что 3 исполнителя на рабочий узел с 5 ядрами на исполнителя, что будет: 3 * 3 * 5 * 4 = 180. Таким образом, 180 разделов должны быть близки к оптимальным?
Учитывая приведенный ниже воспроизводимый код (где df
- это 125 000 строк данных с колонкой String 'text').При динамическом размещении spark помещает импортированный фрейм данных в один раздел моего кластера.
count()
из df
занимает от 8 до 10 секунд до .repartition(180)
и от 1 до 2 секунд после.Принимая во внимание, что функция .rdd
addArrays
занимает около 8-10 секунд до .repartition(180)
и 150 секунд после.
Примечание: Я застрял с использованием spark 2.2.0, поэтому функции массива spark sql функционируютнедоступны для меня.
Запуск .repartition(1)
впоследствии не ускоряется addArrays
, это продолжается около 2,5 минут.Тем не менее, воссоздание ngrams
df с нуля снова, когда spark помещает все в один раздел, ускоряет его до нескольких секунд.
Короче: count()
становится быстрее, .rdd.map()
становится медленнее.
Я могу повторить эти сценарии много раз.Перераспределение до того, как я применю какую-либо функцию или после, не меняет времени на сколько-нибудь заметную величину.
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline
spark = (
SparkSession.builder.master('yarn').appName('test')
.config('spark.kryoserializer.buffer.max', '1g')
.config('spark.sql.cbo.enabled', True)
.config('spark.sql.cbo.joinReorder.enabled', True)
.config('spark.yarn.executor.memoryOverhead', '2g')
.config('spark.driver.maxResultSize', '2g')
.config("spark.port.maxRetries", 100)
.config('spark.dynamicAllocation.enabled', 'true')
.config('spark.dynamicAllocation.executorIdleTimeout', '60')
.config('spark.dynamicAllocation.maxExecutors', '56')
.config('spark.dynamicAllocation.minExecutors', '0')
.config('spark.dynamicAllocation.schedulerBacklogTimeout', '1')
.getOrCreate()
)
sc = spark.sparkContext
sc.defaultParallelism
## my defaultParallelism is 2
placeholder = (
r"Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
r"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris "
r"nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in "
r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
r"pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
r"culpa qui officia deserunt mollit anim id est laborum."
)
df = (
spark.range(0, 250000, 1)
.withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
.withColumn('text', F.lit(placeholder))
.withColumn('text', F.expr("substring_index(text, ' ', rand1)" ))
.withColumn('text', F.split(F.col('text'), ' '))
.select('text')
)
## Saving and reloading puts into 1 partition on my cluster.
df.write.parquet("df.parquet", mode='overwrite')
df = spark.read.parquet("df.parquet")
!hdfs dfs -du -h
## 1.4 M 4.3 M df.parquet
ngram01 = NGram(n=1, inputCol="text", outputCol="ngrams01")
ngram02 = NGram(n=2, inputCol="text", outputCol="ngrams02")
ngram03 = NGram(n=3, inputCol="text", outputCol="ngrams03")
ngram04 = NGram(n=4, inputCol="text", outputCol="ngrams04")
ngram05 = NGram(n=5, inputCol="text", outputCol="ngrams05")
ngram_pipeline = (
Pipeline()
.setStages([ngram01, ngram02, ngram03, ngram04, ngram05])
)
ngrams = (
ngram_pipeline
.fit(df)
.transform(df)
)
'''RDD Function to combine single-ngram Arrays.'''
colsNotNGrams = [c for c in ngrams.columns if 'ngrams' not in c]
colsNotNGramsTpls = ['(row.{},)'.format(c) for c in ngrams.columns if 'ngrams' not in c]
rddColTupls = ' + '.join(colsNotNGramsTpls)
def addArrays(row):
return (
eval( rddColTupls )
+ (row.ngrams01 + row.ngrams02 + row.ngrams03,)
+ (row.ngrams01 + row.ngrams02 + row.ngrams03 + row.ngrams04 + row.ngrams05,)
)
''' timings before repartitioning '''
ngrams.rdd.getNumPartitions()
# output is 1
ngrams.count()
# takes 8 to 10 seconds
ngrams2 = (
ngrams
.rdd.map(addArrays)
.toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## takes 8 to 10 seconds
''' timings after repartitioning '''
ngrams = ngrams.repartition(180)
ngrams.rdd.getNumPartitions()
# output is 180
ngrams2 = (
ngrams
.rdd.map(addArrays)
.toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## now takes 2.5 minutes
## HOWEVER,
ngrams.count()
# now takes 1 to 2 seconds
''' timings after repartitioning again does not help '''
ngrams = ngrams.repartition(1)
ngrams.rdd.getNumPartitions()
# output is 1
ngrams2 = (
ngrams
.rdd.map(addArrays)
.toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## still takes 2.5 minutes