Фрейм данных Spark Print без исчерпания памяти - PullRequest
1 голос
/ 12 марта 2019

Как мне распечатать весь фрейм данных в Java без нехватки памяти?

Dataset<Row> df = ...

Я знаю, что:

df.show() 

Показывает фрейм данных, но при достаточно большом фрейме данных возможно, что это может закончиться нехваткой памяти.

Я знаю, что могу ограничить содержание, используя:

df.show(rowCount, false)

Но я хочу напечатать целый фрейм данных, я не хочу ограничивать содержимое ...

Я пытался:

df.foreachPartition(iter -> {
    while(iter.hasNext()){
       System.out.println(rowIter.next().mkString(",");)
     }
});

Но это будет печатать на каждом из соответствующих узлов, а не на драйвере ...

Если есть какой-нибудь способ, которым я могу напечатать все в драйвере без исчерпания памяти?

Ответы [ 2 ]

2 голосов
/ 12 марта 2019

AFAIK, Идея печати фрейма данных состоит в том, чтобы увидеть данные.

Печать большого информационного кадра не рекомендуется, так как возможен размер информационного кадра из памяти.

Я бы предложил следующие способы, если вы хотите просмотреть содержимое, вы можете сохранить его в таблице улья и запросить содержимое. или напишите в csv или json, который доступен для чтения

Примеры:

1) сохранить в таблице улья

df.write.mode("overwrite").saveAsTable("database.tableName")

более поздний запрос из таблицы улья.

2) CSV или JSON

df.write.csv("/your/location/data.csv")
 df.write.json("/your/location/data.json")

Приведенное выше сгенерирует несколько файлов деталей, если вы хотите использовать один файл coalesce(1) (но это снова приведет к перемещению данных на один узел, который не рекомендуется, если только вам это абсолютно не нужно)

Другой вариант - печатать строку за строкой, используя toLocalIterator см. Здесь , который также будет передавать данные на узел ... следовательно, это не очень хорошая идея

0 голосов
/ 12 марта 2019

Вам придется перенести все данные в драйвер, что немного высосет вашу память: (...

Решением может быть разделение вашего информационного кадра и распечатывание фрагментов по частям в драйвере. Конечно, это зависит от структуры самих данных, это будет выглядеть так:

long count = df.count();
long inc = count / 10;
for (long i = 0; i < count; i += inc) {
  Dataset<Row> filteredDf =
      df.where("id>=" + i + " AND id<" + (i + inc));

  List<Row> rows = filteredDf.collectAsList();
  for (Row r : rows) {
    System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
  }
}

Я разделил набор данных на 10, но я знаю, что мои идентификаторы от 1 до 100 ...

Полный пример может быть:

package net.jgp.books.sparkWithJava.ch20.lab900_splitting_dataframe;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Splitting a dataframe to bring it back to the driver for local
 * processing.
 * 
 * @author jgp
 */
public class SplittingDataframeApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    SplittingDataframeApp app = new SplittingDataframeApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Splitting a dataframe to collect it")
        .master("local")
        .getOrCreate();

    Dataset<Row> df = createRandomDataframe(spark);
    df = df.cache();

    df.show();
    long count = df.count();
    long inc = count / 10;
    for (long i = 0; i < count; i += inc) {
      Dataset<Row> filteredDf =
          df.where("id>=" + i + " AND id<" + (i + inc));

      List<Row> rows = filteredDf.collectAsList();
      for (Row r : rows) {
        System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
      }
    }
  }

  private static Dataset<Row> createRandomDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "id",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "value",
            DataTypes.StringType,
            false) });

    List<Row> rows = new ArrayList<Row>();
    for (int i = 0; i < 100; i++) {
      rows.add(RowFactory.create(i, "Row #" + i));
    }
    Dataset<Row> df = spark.createDataFrame(rows, schema);
    return df;
  }
}

Как вы думаете, это может помочь?

Это не так элегантно, как сохранение его в базе данных, но позволяет избежать дополнительного компонента в вашей архитектуре. Этот код не очень общий, я не уверен, что вы можете сделать его общим в текущей версии Spark.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...