У меня есть три узла Spark кластер (8 ядер и 16 ГБ ОЗУ каждый) в автономном режиме.Я использую Elasticsearch-hadoop разъем для чтения индекса ES.Индекс действительно огромен: более 100 миллионов документов, 5 сегментов и 2 репликации.Когда я создаю фрейм данных с помощью Spark, я хочу, чтобы эта операция загрузки была распараллелена, но я вижу, что она обрабатывается только в драйвере, а не на исполнителях.Одна эта операция занимает более 8 часов для загрузки.Как я могу оптимизировать это и позволить всем рабочим узлам загружать данные параллельно?
Я отправляю задание, используя 16 исполнителей, каждый с 1 ядром и 2 ГБ памяти, и драйвер с 4 ГБ памяти.
df = sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes", es_ip).load(es_index)
df.coalesce(16).write.option("compression","gzip").parquet(pq_filename)
После загрузки я конвертирую это в Parquet и сохраняю в HDFS.