Apache Spark принимает меры для исполнителей в полностью распределенном режиме - PullRequest
1 голос
/ 27 мая 2020

Я новичок в Spark, у меня есть базовое c представление о том, как работают преобразование и действие ( руководство ). Я пытаюсь выполнить некоторые операции НЛП для каждой строки (в основном абзацев) в текстовом файле. После обработки результат должен быть отправлен на сервер (REST Api) для хранения. Программа запускается как искровое задание (отправленное с помощью spark-submit) в кластере из 10 узлов в режиме yarn. Это то, что я делал до сих пор.

...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
    .map(line -> {
        // processed here
        return result;
    });
processedLines.foreach(line -> {
    // Send to server
});

Это работает, но foreach l oop кажется последовательным, похоже, что он не работает в распределенном режиме на рабочих узлах. Я прав?

Я пробовал использовать следующий код, но он не работает. Ошибка: java: incompatible types: inferred type does not conform to upper bound(s). Очевидно, это неправильно, потому что map - это преобразование, а не действие.

lines.map(line -> { /* processing */ })
     .map(line -> { /* Send to server */ });

Я также пробовал с take(), но для этого требуется int, а processedLines.count() имеет тип long.

processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });

Объем данных огромен (больше 100 ГБ). Я хочу, чтобы и обработка, и отправка на сервер выполнялись на рабочих узлах. Часть обработки в map демонстративно происходит на рабочих узлах. Но как мне отправить обработанные данные с рабочих узлов на сервер, потому что foreach кажется последовательным l oop, имеющимся в драйвере (если я прав). Проще говоря, как выполнить action на рабочих узлах, а не в программе драйвера.

Любая помощь будет принята с благодарностью.

Ответы [ 2 ]

1 голос
/ 27 мая 2020

Во-первых, когда ваш код работает на Executors, он уже находится в распределенном режиме, когда вы хотите использовать все ресурсы ЦП на Executors для большего параллелизма, вам следует go для некоторых параметров async и, что более предпочтительно, с работой в пакетном режиме, чтобы избегайте избыточного создания объектов подключения клиента, как показано ниже.

Вы можете заменить свой код на

processedLines.foreach(line -> {

любым из решений

processedLines.foreachAsync(line -> {
    // Send to server
}).get();

//To iterate batch wise I would go for this
processedLines.foreachPartitionAsync(lineIterator -> {
// Create your ouput client connection here
    while (lineIterator.hasNext()){
        String line  = lineIterator.next();
    }
}).get();

Обе функции будут создавать объект Future, или отправьте новый поток, или вызов разблокировки, который автоматически добавит параллелизм в ваш код.

1 голос
/ 27 мая 2020

foreach - действие в искре. По сути, он берет каждый элемент RDD и применяет функцию к этому элементу.

foreach выполняется на узлах исполнителя или рабочих узлах. Это не применяется к узлу драйвера. Обратите внимание, что в режиме локального выполнения при запуске искры и драйвер, и узел исполнителя могут находиться на одной JVM.

Проверьте это для справки объяснение foreach

Ваш подход выглядит нормально где вы пытаетесь сопоставить каждый элемент RDD, а затем применить foreach к каждому элементу. Причина, по которой я могу думать, почему это занимает время, заключается в размере данных, с которыми вы имеете дело (~ 100 ГБ).

Одним из способов оптимизации этого является repartition набор входных данных. В идеале размер каждого раздела должен быть 128 МБ для повышения производительности. Вы найдете множество статей о передовых методах перераспределения данных. Я бы посоветовал вам следовать им. Это даст некоторый выигрыш в производительности.

Вторая оптимизация, о которой вы можете подумать, - это память, которую вы назначаете каждому узлу-исполнителю. Он играет очень важную роль при настройке искры.

Третья оптимизация, о которой вы можете подумать, - это пакетный сетевой вызов на сервер. В настоящее время вы выполняете сетевые вызовы на сервер для каждого элемента RDD. Если ваш дизайн позволяет вам группировать эти сетевые вызовы, вы можете отправлять более 1 элемента за один сетевой вызов. Это также может помочь, если задержка в основном связана с этими сетевыми вызовами.

Надеюсь, это поможет.

...