У меня есть еще один вопрос, я использую кластер EMR, который имеет это:
Мастер: Running1m4.large
Ядро: Running1m4.2xlarge
иУ меня есть код Python, который разделяет текстовый файл.Моя проблема, когда я пытаюсь выполнить свой код сейчас, я использую просто:
spark-submit <my_python_file>.py
, и результат:
[hadoop@ip-192-168-97-253 test_0]$ spark-submit 3.py
19/07/08 13:28:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/07/08 13:28:11 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000654650000, 384626688, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 384626688 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/hadoop/test_0/hs_err_pid30413.log
[hadoop@ip-192-168-97-253 test_0]$ ls -la
total 364
drwxrwxr-x 2 hadoop hadoop 4096 Jul 8 13:29 .
drwxr-xr-x 5 hadoop hadoop 4096 Jul 8 13:22 ..
-rw-rw-r-- 1 hadoop hadoop 1974 Jul 3 23:32 3.py
ЧтоЯвляются ли опции Rigth, которые я должен написать здесь, чтобы использовать все преимущества EMR?
это мой код Python:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType, StringType, DateType
from pyspark.sql.functions import array, col, explode, struct, lit
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sc = spark.sparkContext
fuente_1 = "s3://someplace/data_1.txt"
save_folder = "s3://anotherplace/converted"
new_set_1 = []
def split_txt_1(str):
global new_set_1
start_psotion = 0
index = 0
prev = 0
# str = mylines[0]
substr = u"\u001E"
registro = 0
while index < len(str):
index = str.find(substr, index)
if index == -1:
new_set_1.append(str[start_psotion:len(str)])
start_psotion = index + len(substr)
my_df_1 = spark.createDataFrame(new_set_1, StringType())
final_2 = my_df_1.repartition(1)
final_2.write.csv(path=save_folder, mode="append")
else:
new_set_1.append(str[start_psotion:index])
start_psotion = index + len(substr)
if registro == 100:
my_df_1 = spark.createDataFrame(new_set_1, StringType())
final_2 = my_df_1.repartition(1)
final_2.write.csv(path=save_folder, mode="append")
new_set_1 = []
registro = 0
else:
registro += len(substr)
if index == -1:
break
prev = index + len(substr)
index += len(substr)
lines_1 = sc.textFile(fuente_1)
llist_1 = lines_1.collect()
split_txt_1(llist_1[0])
И это очень маленький примерTXT-файл, очевидно, реальный файл содержит много информации (и все конкатенации с вами "\ u001E")
********************** file.txt **************************************
768|| c-234r4 | Хулио | 38 | c-123 | 123a-533r2 | Ана | 32 | c-123 | 32a-543r4 | Соня | 33 | c-v23 | 43
********************** end file.txt **********************************
вот так выглядит мой файл:
Есть ли здесь какие-то преимущества, пожалуйста?Что я делаю не так?
С уважением