EMR кластер искры представить параметры для кода Python - PullRequest
2 голосов
/ 08 июля 2019

У меня есть еще один вопрос, я использую кластер EMR, который имеет это:

Мастер: Running1m4.large

Ядро: Running1m4.2xlarge

иУ меня есть код Python, который разделяет текстовый файл.Моя проблема, когда я пытаюсь выполнить свой код сейчас, я использую просто:

spark-submit <my_python_file>.py

, и результат:

[hadoop@ip-192-168-97-253 test_0]$ spark-submit 3.py
19/07/08 13:28:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/07/08 13:28:11 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000654650000, 384626688, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 384626688 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/hadoop/test_0/hs_err_pid30413.log
[hadoop@ip-192-168-97-253 test_0]$ ls -la
total 364
drwxrwxr-x 2 hadoop hadoop   4096 Jul  8 13:29 .
drwxr-xr-x 5 hadoop hadoop   4096 Jul  8 13:22 ..
-rw-rw-r-- 1 hadoop hadoop   1974 Jul  3 23:32 3.py

ЧтоЯвляются ли опции Rigth, которые я должен написать здесь, чтобы использовать все преимущества EMR?

это мой код Python:

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType, StringType, DateType
from pyspark.sql.functions import array, col, explode, struct, lit



spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext


fuente_1 = "s3://someplace/data_1.txt"
save_folder = "s3://anotherplace/converted"



new_set_1 = []


def split_txt_1(str):
    global new_set_1

    start_psotion = 0
    index = 0
    prev = 0
    # str = mylines[0]
    substr = u"\u001E"
    registro = 0

    while index < len(str):
        index = str.find(substr, index)


        if index == -1:
            new_set_1.append(str[start_psotion:len(str)])
            start_psotion = index + len(substr)

            my_df_1 = spark.createDataFrame(new_set_1, StringType())
            final_2 = my_df_1.repartition(1)
            final_2.write.csv(path=save_folder, mode="append")


        else:
            new_set_1.append(str[start_psotion:index])
            start_psotion = index + len(substr)

            if registro == 100:
                my_df_1 = spark.createDataFrame(new_set_1, StringType())
                final_2 = my_df_1.repartition(1)
                final_2.write.csv(path=save_folder, mode="append")
                new_set_1 = []
                registro = 0
            else:
                registro += len(substr)

        if index == -1:
            break

        prev = index + len(substr)
        index += len(substr)


lines_1 = sc.textFile(fuente_1)

llist_1 = lines_1.collect()

split_txt_1(llist_1[0])

И это очень маленький примерTXT-файл, очевидно, реальный файл содержит много информации (и все конкатенации с вами "\ u001E")

********************** file.txt **************************************

768|| c-234r4 | Хулио | 38 | c-123 | 123a-533r2 | Ана | 32 | c-123 | 32a-543r4 | Соня | 33 | c-v23 | 43

********************** end file.txt **********************************

вот так выглядит мой файл: image

Есть ли здесь какие-то преимущества, пожалуйста?Что я делаю не так?

С уважением

...