Как в приложении Spark создать CSV-файл из DataFrame (Scala)? - PullRequest
0 голосов
/ 19 января 2019

Мой следующий вопрос не нов, но я хочу понять, как сделать его шаг за шагом.

В приложении Spark я создаю DataFrame.Давайте назовем это df.Версия Spark: 2.4.0

val df: DataFrame  = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

Как создать файл .csv из этого DataFrame и поместить файл CSV в определенную папку на сервере?

Например, правильный ли это код?Я заметил, что некоторые люди используют coalesce или repartition для этой задачи.Но я не понимаю, какой из них будет лучше в моем случае.

union.write
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/home/reports/")

Когда я пытаюсь использовать следующий код, это вызывает ERROR:

org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/home/reports/_temporary/0":hdfs:hdfs:drwxr-xr-x 

Я запускаю приложение Sparkкак root пользователь.reports папка, созданная пользователем root с помощью следующей команды:

mkdir -m 777 reports

Кажется, что только hdfs пользователь может записать файл.

1 Ответ

0 голосов
/ 20 января 2019

Я полагаю, вы не понимаете, как Spark ведет себя, я бы порекомендовал вам сначала прочитать официальную документацию и / или какое-нибудь руководство. Тем не менее, я надеюсь, что это отвечает на ваш вопрос.

Этот код сохранит DataFrame как SINGLE CSV Файл в локальной файловой системе.
Он был протестирован с Spark 2.4.0 с Scala 2.12.8 на Ubuntu 18.04 ноутбуке.

import org.apache.spark.sql.SparkSession

val spark =
  SparkSession
    .builder
    .master("local[*]")
    .appName("CSV Writter Test")
    .getOrCreate()
import spark.implicits._

val df =
  Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
  ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df.printSchema
// root
//  |-- NAME: string (nullable = true)
//  |-- START_DATE: string (nullable = true)
//  |-- END_DATE: string (nullable = true)
//  |-- STATUS: string (nullable = true)

df.coalesce(numPartitions = 1)
  .write
  .option(key = "header", value = "true")
  .option(key = "sep", value = ",")
  .option(key = "encoding", value = "UTF-8")
  .option(key = "compresion", value = "none")
  .mode(saveMode = "OVERWRITE")
  .csv(path = "file:///home/balmungsan/dailyReport/") // Change the path. Note there are 3 /, the first two are for the file protocol, the third one is for the root folder.

spark.stop()

Теперь давайте проверим сохраненный файл.

balmungsan@BalmungSan:dailyReport $ pwd
/home/balmungsan/dailyReport

balmungsan@BalmungSan:dailyReport $ ls
part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv  _SUCCESS

balmungsan@BalmungSan:dailyReport $ cat part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv 
NAME,START_DATE,END_DATE,STATUS
Alex,2018-01-01 00:00:00,2018-02-01 00:00:00,OUT
Bob,2018-02-01 00:00:00,2018-02-05 00:00:00,IN
Mark,2018-02-01 00:00:00,2018-03-01 00:00:00,IN
Mark,2018-05-01 00:00:00,2018-08-01 00:00:00,OUT
Meggy,2018-02-01 00:00:00,2018-02-01 00:00:00,OUT

Файл _SUCCESS существует, чтобы сигнализировать об успешной записи.

Важные примечания:

  • Вам необходимо указать протокол file:// для сохранения в локальной файловой системе, а не в HDFS .
  • Путь указывает имя папки для сохранения разделов файла, а не имя файла, внутри этой папки будет один файл на раздел. Если вы хотите снова прочитать такой файл с помощью Spark , то вам нужно только указать папку, Spark будет понимать файлы разделов. Если нет, я бы порекомендовал переименовать файл после того, как - насколько я знаю, нет никакого способа контролировать имя от Spark .
  • Если df слишком большой, чтобы поместиться в память только одного узла, задание не будет выполнено.
  • Если вы выполните это распределенным способом (например, с основной пряжей) , то файл будет сохранен не в главном узле, а в одном из подчиненных узлов. Если вам действительно нужно, чтобы он был в главном узле, то вы можете собрать его и написать с обычной Scala , как предположил Дмитрий.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...