Как добавить некоторую информацию в Dataframe? - PullRequest
0 голосов
/ 05 февраля 2019

У меня довольно простой DataFrame.

val df = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df.show()

В моем проекте Scala я конвертирую этот DataFrame в файл CSV.Мне нужно добавить некоторую информацию в начале, как вы можете видеть здесь в примере:

| REQUEST_DATE | 2019-02-05 20:00:00 |
| USER         | Kate                |
| SEARCH_TYPE  | Global              |

| NAME         | START_DATE          | END_DATE            | STATUS |
| Alex         | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
| Bob          | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
| Mark         | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
| Mark         | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
| Meggy        | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |

Я попытался создать новый DataFrame и присоединиться к ним вместе.К сожалению, вы не можете объединить 2 DataFrame с разными схемами.

1 Ответ

0 голосов
/ 05 февраля 2019

Если вы не хотите делать это после записи файла на диск, вы можете:

1.Превратите все в строку в обоих ваших фреймах данных.Тем не менее, вывод будет выглядеть так:

 | REQUEST_DATE | 2019-02-05 20:00:00 |                     |        |
 | USER         | Kate                |                     |        |
 | SEARCH_TYPE  | Global              |                     |        |
 |              |                     |                     |        |
 | NAME         | START_DATE          | END_DATE            | STATUS |
 | Alex         | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
 | Bob          | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
 | Mark         | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
 | Mark         | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
 | Meggy        | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
Создайте свой клиентский модуль вывода, который добавит ваш заголовок перед сохранением.Вы можете найти больше информации там - ищите часть для сохранения / записи.

Обновление

Если вы хотите сделать # 1Вот код для преобразования первого фрейма данных (с данными):

Dataset<Row> transitionDf = dataDf
    .withColumn("_c1", dataDf.col("NAME"))
    .withColumn("_c2",
        dataDf.col("START_DATE").cast(DataTypes.StringType))
    .withColumn("_c3",
        dataDf.col("END_DATE").cast(DataTypes.StringType))
    .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
    .drop("NAME")
    .drop("START_DATE")
    .drop("END_DATE")
    .drop("STATUS");

Ключ к cast() вашим столбцам, чтобы вы могли использовать unionByName() для объединения обоих фреймов данных.Весь код на Java (я не делаю Scala) будет выглядеть примерно так:

package net.jgp.labs.spark.l901Union;

import java.sql.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Use of unionByName() to create a complex header on a dataframe.
 * 
 * @author jgp
 */
public class UnionApp {
  private SimpleDateFormat format = null;

  public UnionApp() {
    this.format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  }

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   * @throws ParseException
   */
  public static void main(String[] args) throws ParseException {
    UnionApp app = new UnionApp();
    app.start();
  }

  /**
   * The processing code.
   * 
   * @throws ParseException
   */
  private void start() throws ParseException {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("expr()")
        .master("local")
        .getOrCreate();

    // DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
    // HH:mm:ss", Locale.ENGLISH);

    // Data
    StructType dataSchema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "NAME",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "START_DATE",
            DataTypes.DateType,
            false),
        DataTypes.createStructField(
            "END_DATE",
            DataTypes.DateType,
            false),
        DataTypes.createStructField(
            "STATUS",
            DataTypes.StringType,
            false) });
    List<Row> dataRows = new ArrayList<Row>();
    dataRows.add(RowFactory.create("Alex", toDate("2018-01-01 00:00:00"),
        toDate("2018-02-01 00:00:00"), "OUT"));
    dataRows.add(RowFactory.create("Bob", toDate("2018-02-01 00:00:00"),
        toDate("2018-02-05 00:00:00"), "IN"));
    dataRows.add(RowFactory.create("Mark", toDate("2018-02-01 00:00:00"),
        toDate("2018-03-01 00:00:00"), "IN"));
    dataRows.add(RowFactory.create("Mark", toDate("2018-05-01 00:00:00"),
        toDate("2018-08-01 00:00:00"), "OUT"));
    dataRows.add(RowFactory.create("Meggy", toDate("2018-02-01 00:00:00"),
        toDate("2018-02-01 00:00:00"), "OUT"));
    Dataset<Row> dataDf = spark.createDataFrame(dataRows, dataSchema);
    dataDf.show();
    dataDf.printSchema();

    // Header
    StructType headerSchema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "_c1",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "_c2",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "_c3",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "_c4",
            DataTypes.StringType,
            false) });
    List<Row> headerRows = new ArrayList<Row>();
    headerRows.add(RowFactory.create("REQUEST_DATE",
        format.format(new java.util.Date()), "", ""));
    headerRows.add(RowFactory.create("USER", "Kate", "", ""));
    headerRows.add(RowFactory.create("SEARCH_TYPE", "Global", "", ""));
    headerRows.add(RowFactory.create("", "", "", ""));
    headerRows
        .add(RowFactory.create("NAME", "START_DATE", "END_DATE", "STATUS"));
    Dataset<Row> headerDf = spark.createDataFrame(headerRows, headerSchema);
    headerDf.show(false);
    headerDf.printSchema();

    // Transition
    Dataset<Row> transitionDf = dataDf
        .withColumn("_c1", dataDf.col("NAME"))
        .withColumn("_c2",
            dataDf.col("START_DATE").cast(DataTypes.StringType))
        .withColumn("_c3",
            dataDf.col("END_DATE").cast(DataTypes.StringType))
        .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
        .drop("NAME")
        .drop("START_DATE")
        .drop("END_DATE")
        .drop("STATUS");
    transitionDf.show(false);
    transitionDf.printSchema();

    // Union
    Dataset<Row> unionDf = headerDf.unionByName(transitionDf);
    unionDf.show(false);
    unionDf.printSchema();
  }

  private Date toDate(String dateAsText) throws ParseException {
    java.util.Date parsed;
    parsed = format.parse(dateAsText);
    return new Date(parsed.getTime());
  }
}

Я сохранил это как часть моих Spark и Java labs и GitHub .Эквивалентный код Scala, вероятно, будет немного более компактным:).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...