Я новичок в Spark, у меня есть базовое c представление о том, как работают преобразование и действие ( руководство ). Я пытаюсь выполнить некоторые операции НЛП для каждой строки (в основном абзацев) в текстовом файле. После обработки результат должен быть отправлен на сервер (REST Api) для хранения. Программа запускается как искровое задание (отправленное с помощью spark-submit) в кластере из 10 узлов в режиме yarn
. Это то, что я делал до сих пор.
...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
.map(line -> {
// processed here
return result;
});
processedLines.foreach(line -> {
// Send to server
});
Это работает, но foreach
l oop кажется последовательным, похоже, что он не работает в распределенном режиме на рабочих узлах. Я прав?
Я пробовал использовать следующий код, но он не работает. Ошибка: java: incompatible types: inferred type does not conform to upper bound(s)
. Очевидно, это неправильно, потому что map
- это преобразование, а не действие.
lines.map(line -> { /* processing */ })
.map(line -> { /* Send to server */ });
Я также пробовал с take()
, но для этого требуется int
, а processedLines.count()
имеет тип long
.
processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });
Объем данных огромен (больше 100 ГБ). Я хочу, чтобы и обработка, и отправка на сервер выполнялись на рабочих узлах. Часть обработки в map
демонстративно происходит на рабочих узлах. Но как мне отправить обработанные данные с рабочих узлов на сервер, потому что foreach
кажется последовательным l oop, имеющимся в драйвере (если я прав). Проще говоря, как выполнить action
на рабочих узлах, а не в программе драйвера.
Любая помощь будет принята с благодарностью.