Запись файла CSV с использованием Spark и java - обработка пустых значений и кавычек - PullRequest
6 голосов
/ 26 февраля 2020

Исходные данные находятся в наборе данных , и я пытаюсь записать в файл с разделителями канала и хочу, чтобы каждая непустая ячейка и ненулевые значения были помещены в кавычки. Пустые или нулевые значения не должны содержать кавычек

result.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("nullValue", "")
            .option("quoteAll", "false")
            .csv(Location);

Ожидаемый вывод:

"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"

Текущий вывод:

London||UK
Delhi|India
Moscow|Russia

Если я изменю "quoteAll" на " true ", вывод, который я получаю:

"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"

Версия Spark 2.3 и java версия java 8

Ответы [ 3 ]

5 голосов
/ 03 марта 2020

Java ответ. CSV escape - это не просто добавление «символов вокруг. Вы должны обрабатывать» внутри строк. Итак, давайте используем StringEscapeUtils и определим UDF, который будет его вызывать. Затем просто примените UDF к каждому столбцу.

import org.apache.commons.text.StringEscapeUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import java.util.Arrays;

public class Test {

    void test(Dataset<Row> result, String Location) {
        // define UDF
        UserDefinedFunction escape = udf(
            (String str) -> str.isEmpty()?"":StringEscapeUtils.escapeCsv(str), DataTypes.StringType
        );
        // call udf for each column
        Column columns[] = Arrays.stream(result.schema().fieldNames())
                .map(f -> escape.apply(col(f)).as(f))
                .toArray(Column[]::new);

         // save the result
        result.select(columns)
                .coalesce(1).write()
                .option("delimiter", "|")
                .option("header", "true")
                .option("nullValue", "")
                .option("quoteAll", "false")
                .csv(Location);
    }
}

Примечание: coalesce (1) - неправильный вызов. Он собирает все данные об одном исполнителе. Вы можете получить исполнителя OOM в производстве для огромного набора данных.

2 голосов
/ 02 марта 2020

РЕДАКТИРОВАТЬ & Предупреждение: Не видел тег java. Это решение Scala, которое использует foldLeft как al oop до go для всех столбцов. Если его заменить на Java дружественный l oop, все должно работать как есть. Я попытаюсь вернуться к этому позже.

Программным решением c может быть

val columns = result.columns
val randomColumnName = "RND"

val result2 = columns.foldLeft(result) { (data, column) =>
data
  .withColumnRenamed(column, randomColumnName)
  .withColumn(column,
    when(col(randomColumnName).isNull, "")
      .otherwise(concat(lit("\""), col(randomColumnName), lit("\"")))
  )
  .drop(randomColumnName)
}

. В результате получаются строки с " вокруг них и запись будет пустой. строки в нуль. Если вам нужно сохранить пустые значения, просто сохраните их.

Затем просто запишите это:

result2.coalesce(1).write()
            .option("delimiter", "|")
            .option("header", "true")
            .option("quoteAll", "false")
            .csv(Location);
0 голосов
/ 03 марта 2020

Это, конечно, не эффективный ответ, и я модифицирую его, основываясь на ответе Артема Алиева, но подумал, что он будет полезен немногим, поэтому размещение этого ответа

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;<br/>
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;<br/>
public class Quotes {<br/>
    private static final String DELIMITER = "|";
    private static final String Location = "Give location here";

    public static void main(String[] args) {

        SparkSession sparkSession = SparkSession.builder() 
                .master("local") 
                .appName("Spark Session") 
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> result = sparkSession.read()
                .option("header", "true")
                .option("delimiter",DELIMITER)
                .csv("Sample file to read"); //Give the details of file to read here

      UserDefinedFunction udfQuotesNonNull = udf(
        (String abc) -> (abc!=null? "\""+abc+"\"":abc),DataTypes.StringType
      );

      result = result.withColumn("ind_val", monotonically_increasing_id()); //inducing a new column to be used for join as there is no identity column in source dataset


      Dataset<Row> dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val"))); //Dataset used for storing temporary results
      Dataset<Row> dataset = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")));  //Dataset used for storing output

      String[] str = result.schema().fieldNames();
      dataset1.show();
      for(int j=0; j<str.length-1;j++)
      {
        dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")),(udfQuotesNonNull.apply(col(str[j]).cast("string")).alias("\""+str[j]+"\""))); 
        dataset=dataset.join(dataset1,"ind_val"); //Joining based on induced column
      }
      result = dataset.drop("ind_val");

      result.coalesce(1).write()
      .option("delimiter", DELIMITER)
      .option("header", "true")
      .option("quoteAll", "false")
      .option("nullValue", null)
      .option("quote", "\u0000") 
      .option("spark.sql.sources.writeJobUUID", false)
      .csv(Location);
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...