У меня есть сложная функция complex_function
, которую я пытаюсь выполнить параллельно для большого набора данных. complex_function
делает кривую подгонку используя scipy и numpy. Я считываю данные в виде текстового потока text_stream
и использую text_stream.map(complex_function)
для параллельного выполнения complex_function
для каждого значения в СДР. У меня есть 4 узла с 2 исполнителями каждый, каждый из которых имеет 3 ядра и достаточно оперативной памяти для обработки раздела, над которым они работают. Однако я заметил, что один узел загружает процессор на 100%, в то время как другие остаются более или менее простаивающими вместе с очередью, полной ожидающих заданий. Это связано с complex_function
, и было бы лучше разбить complex_function
на несколько преобразований по СДР?
Такое ощущение, что я перепробовал все, начиная с изменения размера разделов и размера каждой передаваемой записи, а также максимальной скорости, с которой получатель будет читать записи. У меня есть печать вывода вместо записи в формате hdf. Я зашел так далеко, что включил spark.streaming.concurrentJobs
, который позволял выполнять задания параллельно, но из того, что я понял, это не решение моей проблемы.
sc = SparkContext()
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("textfile_server", 9999)
lines.map(complex_function)\
.saveAsTextFiles('hdfs://spark-namenode:9000/tmp/complex_function_output')
ssc.start()
ssc.awaitTermination()
Я ожидал, что СДР будет распределен по кластеру в зависимости от настройки раздела, где complex_function
будет параллельно работать на каждом. Похоже, что мое понимание того, как работают Hadoop и Spark, немного перепуталось.