Память драйвера Apache Spark - PullRequest
2 голосов
/ 21 марта 2019

Я пытался установить и запустить простой Java Apache Spark в intellij для Windows, но у меня есть ошибка, которую я не могу решить. Я установил искру через Maven. Я получаю эту ошибку:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/03/20 23:53:23 INFO SparkContext: Running Spark version 2.0.0-cloudera1-SNAPSHOT
19/03/20 23:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/20 23:53:24 INFO SecurityManager: Changing view acls to: Drakker
19/03/20 23:53:24 INFO SecurityManager: Changing modify acls to: Drakker
19/03/20 23:53:24 INFO SecurityManager: Changing view acls groups to: 
19/03/20 23:53:24 INFO SecurityManager: Changing modify acls groups to: 
19/03/20 23:53:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Drakker); groups with view permissions: Set(); users  with modify permissions: Set(Drakker); groups with modify permissions: Set()
19/03/20 23:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 50007.
19/03/20 23:53:25 INFO SparkEnv: Registering MapOutputTracker
19/03/20 23:53:25 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
    at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212)
    at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:429)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at Spark.App.main(App.java:16)
19/03/20 23:53:25 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
    at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212)
    at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:429)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at Spark.App.main(App.java:16)

Я попытался установить память драйвера вручную, но это не сработало. Я также попытался установить спарк локально, но изменение памяти драйвера из командной строки не помогло.

Это код:

package Spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

public class App 
{
    public static void main( String[] args )
    {
        SparkConf conf = new SparkConf().setAppName("Spark").setMaster("local");
//        conf.set("spark.driver.memory","471859200");
        JavaSparkContext sc = new JavaSparkContext(conf);


        List<Integer> data= Arrays.asList(1,2,3,4,5,6,7,8,9,1,2,3,4,5,6,7,8,9);
        JavaRDD<Integer> rdd=sc.parallelize(data);
        JavaRDD<Integer> list=rdd.map(s->s);
        int totalLines=list.reduce((a,b)->a+b);
        System.out.println(totalLines);
    }
}

Я получаю ошибку при создании экземпляра JavaSparkContext. У кого-нибудь есть идеи, как это решить?

Спасибо!

Ответы [ 3 ]

1 голос
/ 21 марта 2019

Если вы используете Eclipse, вы можете установить Run> Run Configurations...> Arguments> VM arguments and set max heap size like -Xmx512m.

По идее вы можете установить Run\Debug Configurations> VM options : -Xmx512m

В своем коде вы можете попробовать это conf.set("spark.testing.memory", "2147480000")

1 голос
/ 21 марта 2019

Меня немного смущает ваш код, так как он смешивает конструкцию до Spark 2.x, как SparkConf и много RDD. Их использование не является неправильным, но, начиная с Spark 2.x, все немного по-другому.

Вот пример использования SparkSession и фреймов данных, которые являются расширенной, более мощной версией RDD (для краткости).

В этом примере вы увидите несколько способов выполнения операций map / Reduce: два с map / reduction и один с простым SQL-подобным синтаксисом.

отобразить и уменьшить с помощью getAs ()

int totalLines = df
    .map(
        (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"),
        Encoders.INT())
    .reduce((a, b) -> a + b);
System.out.println(totalLines);

отобразить и уменьшить с помощью getInt ()

totalLines = df
    .map(
        (MapFunction<Row, Integer>) row -> row.getInt(0),
        Encoders.INT())
    .reduce((a, b) -> a + b);
System.out.println(totalLines);

SQL-типа

Это, наверное, самый популярный.

long totalLinesL = df.selectExpr("sum(*)").first().getLong(0);
System.out.println(totalLinesL);

Полный пример

package net.jgp.books.spark.ch07.lab990_others;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Simple ingestion followed by map and reduce operations.
 * 
 * @author jgp
 */
public class SelfIngestionApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    SelfIngestionApp app = new SelfIngestionApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Self ingestion")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> df = createDataframe(spark);
    df.show(false);

    // map and reduce with getAs()
    int totalLines = df
        .map(
            (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"),
            Encoders.INT())
        .reduce((a, b) -> a + b);
    System.out.println(totalLines);

    // map and reduce with getInt()
    totalLines = df
        .map(
            (MapFunction<Row, Integer>) row -> row.getInt(0),
            Encoders.INT())
        .reduce((a, b) -> a + b);
    System.out.println(totalLines);

    // SQL-like
    long totalLinesL = df.selectExpr("sum(*)").first().getLong(0);
    System.out.println(totalLinesL);
  }

  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "i",
            DataTypes.IntegerType,
            false) });

    List<Integer> data =
        Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9);
    List<Row> rows = new ArrayList<>();
    for (int i : data) {
      rows.add(RowFactory.create(i));
    }

    return spark.createDataFrame(rows, schema);
  }
}
0 голосов
/ 21 марта 2019

Вы можете попробовать использовать Spark Session Builder и получить контекст искры с помощью spark.sparkContext ()

public static SparkSession sparkSession(String master,
                                        String appName) {
return    SparkSession.builder().appName(appName)
                       .master(master)
                       .config("spark.dynamicAllocation.enabled", true)
                       .config("spark.shuffle.service.enabled", true)
                       .config("spark.driver.maxResultSize", "8g")
                       .config("spark.executor.memory", "8g")
                       .config("spark.executor.cores", "4")
                       .config("spark.cores.max", "6")
                       .config("spark.submit.deployMode", "client")
                       .config("spark.network.timeout", "3600s")
                       .config("spark.eventLog.enabled", true)
                       .getOrCreate();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...