Мой ввод - это файл csv / tsv или любой другой, разделенный разделителем, и его заголовок.Я хочу сопоставить любой столбец в качестве ключа и всю строку в качестве значения.Я выполнил приведенный ниже код нормально на своей машине, но потерпел неудачу при тестировании в режиме кластера пряжи.
public class SparkController implements java.io.Serializable {
String DELIMITER;
String[] header;
String path;
public static void main(String[] args) {
// some parse function
// say input file is a csv likes: (id,timestamp,ip)
// header = [ "id", "timestamp", "ip" ]
// DELIMITER = ","
SparkController sparkController = new SparkController();
sparkController.parseArgs(args);
JavaPariRDD<String, String> pairRdd = sparkController.map2PairRdd("ip");
}
private JavaPariRDD<String, String> map2PairRdd(String column) {
JavaRDD<String> rawFile = sc.textFile(path);
JavaPariRDD<String, String> pairRdd = rawFile.mapToPair((s) -> {
// DELIMITER can be accessed normally
String[] fields = s.split(DELIMITER);
// turns out header is empty when runs in yarn,
// but works fine in standalone mode
return new Tuple2<>(fields[header.indexOf("ip")], s);
});
// other operations continue
}
}
Я понимаю, что такие переменные, как DELIMITER
и header
, сериализуются работникам в режиме кластера.Но как массив header
может быть пустым внутри операции rdd.
Я изменяю код, объявляя окончательную переменную int index
вне mapToPair
и получаю доступ к index
внутри, тогда эта ошибка исправлена.
Но я все еще не понимаю, почему header
пуст, когда доступ внутри mapToPair
.Кто-нибудь может дать некоторые идеи?