Какой вариант выбрать для записи CSV-файла в Spark (HDFS)? - PullRequest
0 голосов
/ 08 июня 2018

Мне нужно сравнить файлы CSV, а затем удалить все дублирующиеся строки.Итак, мое состояние таково, что у меня есть одна папка, и я должен поместить каждый отфильтрованный результат в эту папку, и когда придет какой-то новый файл, я должен сравнить существующие файлы в папке с новым и, наконец, я должен поместитьобратно результат в ту же папку.

eg: /data/ingestion/file1.csv

   a1 b1 c1

   a2 b2 c2

   a3 b3 c3

/data/ingestion/file2.csv

   a4 b4 c4

   a5 b5 c5

   a6 b6 c6

new upcoming file(upcoming_file.csv):

   a1 b1 c1

   a5 b5 c5

   a7 b7 c7

Теперь мой подход заключается в создании одного кадра данных из всех файлов, представленных в / data / ingestion / *.Затем создайте один информационный кадр для upcoming_file.csv и добавьте их оба, используя операцию объединения.Наконец, применяя четкие преобразования.Теперь я должен записать его обратно в / data / ingestion, убедившись, что не будет никакой двуличности.Итак, я выбираю операцию перезаписи.

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion/")

Затем я в конечном итоге удаляю все, что находится внутри папки / data / ingestion.Даже новый фрейм данных не записывается в файлы CSV.

Я пробовал и другие варианты, но я не достиг того, что объяснил выше!

Заранее спасибо!

1 Ответ

0 голосов
/ 08 июня 2018

Я предлагаю записать вывод в новый каталог в формате hdf - в случае сбоя обработки вы всегда сможете отказаться от всего, что было обработано, и запустить обработку с нуля с исходными данными - это безопасно и просто.:)

Когда обработка будет завершена - просто удалите старую и переименуйте новую в имя старой.

ОБНОВЛЕНИЕ:

deleted_duplicate.write
  .format("csv")
  .mode("overwrite")
  .save("hdfs://localhost:8020/data/ingestion_tmp/")

   Configuration conf = new Configuration();
    conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
    FileSystem  hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf);
    hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive);
    hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");

Здесь - ссылка на документацию по HDFS FileSystem API

...