Java Spark - Проблема с фильтрацией записей в RDD по количеству столбцов. - PullRequest
0 голосов
/ 07 мая 2020

Это другая проблема, я пытаюсь отфильтровать записи в RDD по количеству столбцов. Это больше похоже на обработку файлов.

Я написал то же самое в Pyspark и вижу, что записи фильтруются правильно. Когда я пытаюсь ввести Java, действительные записи попадают в папку ошибок.

Загрузил файлы ошибок и проверил их с помощью AWK, чтобы обнаружить, что они имеют столбец 996, но все еще отфильтровываются в ошибке.

В python точное количество отфильтрованных файлов являются файлами ошибок.

Ниже приведен фрагмент фрагмента.


JavaRDD<String> inputDataRDD = sc.textFile(args[0]+"/"+args[1], 5000);

int columnLength = Integer.parseInt(args[3]);

inputDataRDD
.filter(filterData -> filterData.split("\t").length == columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Valid/", GzipCodec.class);

inputDataRDD
.filter(filterData -> filterData.split("\t").length != columnLength)
.coalesce(1)
.saveAsTextFile(args[2]+"Error/", GzipCodec.class);

Конец фрагмента. .

В этом файле около 10M записей.

Есть ли разница в s c .textfile (filename, int numPartitions) между Java и Python или Я что-то упустил?

Нужна ваша помощь, чтобы выяснить ошибку, которую я сделал.

Примечание: -Сделал сборку maven с использованием eclipse и выполнил следующую команду в Yarn.

spark-submit --class com.virtualpairprogrammers.ProcessFilesToHDFS --master yarn learningSpark-0.0.1-SNAPSHOT.jar "/input/ABFeeds/" "ABFeeds_2020-04-20.tsv.gz" "/output/ABFeeds/2020-05-06/" 996

Заранее спасибо

С уважением

Сэм

1 Ответ

1 голос
/ 08 мая 2020

Проблема связана с командой Split, которую я использовал. Проблема: - Когда последний столбец пуст, разделение Java не может рассматривать его как столбец. Я сослался на следующий сайт, на котором говорится о проблеме с разделением

Java Разделение строки удалило пустые значения

Старый фрагмент:

inputDataRDD
    .filter(filterData -> filterData.split("\t").length == columnLength)
    .coalesce(1)
    .saveAsTextFile(args[2]+"Valid/", GzipCodec.class);

Модифицированный сниппет:

inputDataRDD
    .filter(filterData -> filterData.split("\t",-1).length == columnLength)
    .coalesce(1)
    .saveAsTextFile(args[2]+"Valid/", GzipCodec.class);

Я протестировал его, и он работает.

Спасибо всем за помощь.

С уважением

...