При записи в путь hdfs возникает ошибка java .io.IOException: не удалось переименовать - PullRequest
0 голосов
/ 27 мая 2020

Я использую spark- sql -2.4.1v, который использует версию has oop -2.6.5.jar. Мне нужно сначала сохранить данные на hdfs, а позже перейти на кассандру. Следовательно, я пытаюсь сохранить данные на hdfs, как показано ниже:

String hdfsPath = "/user/order_items/";
cleanedDs.createTempViewOrTable("source_tab");

givenItemList.parallelStream().forEach( item -> {   
    String query = "select $item  as itemCol , avg($item) as mean groupBy year";
    Dataset<Row> resultDs = sparkSession.sql(query);

    saveDsToHdfs(hdfsPath, resultDs );   
});


public static void saveDsToHdfs(String parquet_file, Dataset<Row> df) {
    df.write()                                 
      .format("parquet")
      .mode("append")
      .save(parquet_file);
    logger.info(" Saved parquet file :   " + parquet_file + "successfully");
}

Когда я запускаю свою работу в кластере, он не выдает эту ошибку:

java.io.IOException: Failed to rename FileStatus{path=hdfs:/user/order_items/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.parquet; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to hdfs:/user/order_items/part-00007.parquet
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)

Пожалуйста, предложите, как исправить это проблема?

Ответы [ 2 ]

5 голосов
/ 27 мая 2020

Вы можете выполнить все выборки за одно задание, получить все выборки и объединить их в единую таблицу.

Dataset<Row> resultDs = givenItemList.parallelStream().map( item -> {   
    String query = "select $item  as itemCol , avg($item) as mean groupBy year";
    return sparkSession.sql(query);
}).reduce((a, b) -> a.union(b)).get

saveDsToHdfs(hdfsPath, resultDs );
1 голос
/ 27 мая 2020

Ошибка заключается в том, что вы пытаетесь записать фрейм данных в одно и то же место для каждого элемента в вашей коллекции givenItemList. Обычно при этом выдается ошибка:

OutputDirectory уже существует

Но поскольку функция foreach будет выполнять все элементы в параллельном потоке, вы получаете эту ошибку. можно указать отдельные каталоги для каждого потока, например:

givenItemList.parallelStream().forEach( item -> {   
String query = "select $item  as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(Strin.format("%s_item",hdfsPath), resultDs );   

});

Или вы также можете иметь подкаталоги в hdfspath, например

givenItemList.parallelStream().forEach( item -> {   
String query = "select $item  as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);

saveDsToHdfs(Strin.format("%s/item",hdfsPath), resultDs );   

}); `

...