Сортировка искр с помощью JavaRDD <String> - PullRequest
0 голосов
/ 25 апреля 2018

Допустим, у меня есть файл со строкой строк, и я импортирую его в JavaRDD, если я пытаюсь отсортировать строки и экспортировать как новый файл, как мне это сделать?Приведенный ниже код является моей попыткой, и он не работает

JavaSparkContext sparkContext = new JavaSparkContext("local[*]", "Spark Sort");
Configuration hadoopConfig = sparkContext.hadoopConfiguration();
hadoopConfig.set("fs.hdfs.imp", DistributedFileSystem.class.getName());
hadoopConfig.set("fs.file.impl", LocalFileSystem.class.getName());
JavaRDD<String> lines = sparkContext.textFile(args[0]);
JavaRDD<String> sorted = lines.sortBy(i->i, true,1);
sorted.saveAsTextFile(args[1]);

Что я имею в виду под "не работает", так это то, что выходной файл не отсортирован.Я думаю, что проблема связана с моим кодом «i-> i», я не уверен, как сделать так, чтобы он сортировался с помощью метода сравнения строк, так как каждое «i» будет строкой (также не уверен, как заставить его сравнивать разные"i"

EDIT Я изменил код в соответствии с комментариями, я подозреваю, что файл читается как 1 гигантская строка.

JavaSparkContext sparkContext = new JavaSparkContext("local[*]", "Spark Sort");
Configuration hadoopConfig = sparkContext.hadoopConfiguration();
hadoopConfig.set("fs.hdfs.imp", DistributedFileSystem.class.getName());
hadoopConfig.set("fs.file.impl", LocalFileSystem.class.getName());
long start  = System.currentTimeMillis();

List<String> array = buildArrayList(args[0]);
JavaRDD<String> lines = sparkContext.parallelize(array);
JavaRDD<String> sorted = lines.sortBy(i->i, true, 1);
sorted.saveAsTextFile(args[1]);

Все еще не сортируетсяэто: (

1 Ответ

0 голосов
/ 26 апреля 2018

Я провел небольшое исследование.Ваш код правильный.Вот примеры, которые я тестировал:

Инициализация искры

SparkSession spark = SparkSession.builder().appName("test")
        .config("spark.debug.maxToStringFields", 10000)
        .config("spark.sql.tungsten.enabled", true)
        .enableHiveSupport().getOrCreate();

JavaSparkContext jSpark = new JavaSparkContext(spark.sparkContext());

Пример для СДР

//RDD
JavaRDD rdd = jSpark.parallelize(Arrays.asList("z", "b", "c", "a"));
JavaRDD sorted = rdd.sortBy(i -> i, true, 1);
List<String> result = sorted.collect();
result.stream().forEach(i -> System.out.println(i));

Выход

a
b
c
z

Вытакже можно использовать API набора данных // Набор данных

Dataset<String> stringDataset = spark.createDataset(Arrays.asList("z", "b", "c", "a"), Encoders.STRING());
Dataset<String> sortedDataset = stringDataset.sort(stringDataset.col(stringDataset.columns()[0]).desc()); //by defualt is ascending order
result = sortedDataset.collectAsList();
result.stream().forEach(i -> System.out.println(i));

Вывод:

z
c
b
a

Ваша проблема, я думаю, в том, что в вашем текстовом файле есть определенный разделитель строк.Если это так - вы можете использовать функцию flatMap, чтобы разбить вашу гигантскую текстовую строку на строки.Вот пример с набором данных // пример flatMap

Dataset<String> singleLineDS= spark.createDataset(Arrays.asList("z:%b:%c:%a"),  Encoders.STRING());
Dataset<String> splitedDS = singleLineDS.flatMap(i->Arrays.asList(i.split(":%")).iterator(),Encoders.STRING());
Dataset<String> sortedSplitedDs = splitedDS.sort(splitedDS.col(splitedDS.columns()[0]).desc());
result = sortedSplitedDs.collectAsList();
result.stream().forEach(i -> System.out.println(i));

Таким образом, вы должны найти, какой разделитель находится в вашем текстовом файле, и принять приведенный выше код для вашей задачи

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...