Watson Studio "Spark Environment" - как увеличить `spark.driver.maxResultSize`? - PullRequest
0 голосов
/ 24 ноября 2018

Я выполняю задание спарк, где я читаю, манипулирую и объединяю множество txt-файлов в один файл, но я решаю эту проблему:

Py4JJavaError: ошибкапроизошло во время вызова o8483.collectToPython.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: общий размер сериализованных результатов 838 задач (1025,6 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)

Isможно увеличить размер spark.driver.maxResultSize?

Примечание. Этот вопрос относится к «средам» WS Spark, а НЕ к Analytics Engine.

1 Ответ

0 голосов
/ 30 ноября 2018

Вы можете увеличить значение по умолчанию через консоль Ambari, если используете экземпляр искрового кластера «Analytics Engine».Вы можете получить ссылку и учетные данные для консоли Ambari из экземпляра IAE в console.bluemix.net.С консоли Ambari добавьте новое свойство в

Spark2 -> «Пользовательские значения по умолчанию для spark2» -> Добавить свойство -> spark.driver.maxResultSize = 2GB

Makeубедитесь, что значения spark.driver.maxResultSize меньше памяти водителя, установленной в

Spark2 -> "Advanced spark2-env" -> content -> SPARK_DRIVER_MEMORY

Еще одно предложение, если вы просто пытаетесь создать один CSV-файл и не хотите изменять значения conf conf, так как вы не знаете, каков будет конечный файл, это использовать функцию, подобную приведенной ниже, которая использует функцию hdfs getmerge длясоздайте один CSV-файл, как pandas.

def writeSparkDFAsCSV_HDFS(spark_df, file_location,file_name, csv_sep=',', csv_quote='"'):
    """
    It can be used to write large spark dataframe as a csv file without running 
    into memory issues while converting to pandas dataframe.
    It first writes the spark df to a temp hdfs location and uses getmerge to create 
    a single file. After adding a header, the merged file is moved to hdfs.

    Args:
        spark_df (spark dataframe) : Data object to be written to file.
        file_location (String) : Directory location of the file.
        file_name (String) : Name of file to write to.
        csv_sep (character) : Field separator to use in csv file
        csv_quote (character) : Quote character to use in csv file
    """
    # define temp and final paths
    file_path= os.path.join(file_location,file_name)
    temp_file_location = tempfile.NamedTemporaryFile().name 
    temp_file_path = os.path.join(temp_file_location,file_name)

    print("Create directories")
    #create directories if not exist in both local and hdfs
    !mkdir $temp_file_location
    !hdfs dfs -mkdir $file_location
    !hdfs dfs -mkdir $temp_file_location

    # write to temp hdfs location
    print("Write to temp hdfs location : {}".format("hdfs://" + temp_file_path))
    spark_df.write.csv("hdfs://" + temp_file_path, sep=csv_sep, quote=csv_quote)


    # merge file from hadoop to local
    print("Merge and put file at {}".format(temp_file_path))
    !hdfs dfs -getmerge $temp_file_path $temp_file_path

    # Add header to the merged file
    header = ",".join(spark_df.columns)
    !rm $temp_file_location/.*crc
    line_prepender(temp_file_path, header)

    #move the final file to hdfs
    !hdfs dfs -put -f $temp_file_path $file_path

    #cleanup temp locations
    print("Cleanup..")
    !rm -rf $temp_file_location
    !hdfs dfs -rm -r $temp_file_location
    print("Done!")
...