У меня есть AWS Клей Python Работа с оболочкой, которая не выполняется примерно через минуту после обработки текстового файла объемом 2 ГБ. Задание вносит незначительные изменения в файл, такие как поиск и удаление некоторых строк, удаление последнего символа в строке и добавление возврата каретки в зависимости от условий. То же задание отлично работает для файлов размером менее 1 ГБ.
- Задание «Максимальная емкость» равно 1.
- «Максимальный параллелизм» равен 2880.
- «Тайм-аут задания (минут)» равно 900.
Подробное сообщение об ошибке:
Traceback (most recent call last):
File "/tmp/runscript.py", line 142, in <module>
raise e_type(e_value).with_traceback(new_stack)
File "/tmp/glue-python-scripts-9g022ft7/pysh-tf-bb-to-parquet.py", line 134, in <module>
MemoryError
Фактический python код, который я пытаюсь запустить:
import boto3
import json
import os
import sys
from sys import getsizeof
import datetime
from datetime import datetime
import os
import psutil
import io
import pandas as pd
import pyarrow as pa #not supported by glue
import pyarrow.parquet as pq #not supported by glue
import s3fs #not supported by glue
#Object parameters (input and output).
s3region = 'reducted'
s3bucket_nm = 'reducted'
#s3 inbound object parameters.
s3object_inbound_key_only = 'reducted'
s3object_inbound_folder_only = 'reducted'
s3object_inbound_key = s3object_inbound_folder_only + '/' + s3object_inbound_key_only
#s3 object base folder parameter.
s3object_base_folder = s3object_inbound_key_only[:-9].replace('.', '_')
#s3 raw object parameters.
s3object_raw_key_only = s3object_inbound_key_only
s3object_raw_folder_only = 'reducted' + s3object_base_folder
s3object_raw_key = s3object_raw_folder_only + '/' + s3object_inbound_key_only
#s3 PSV object parameters.
s3object_psv_key_only = s3object_inbound_key_only + '.psv'
s3object_psv_folder_only = 'reducted' + s3object_base_folder + '_psv'
s3object_psv_key = s3object_psv_folder_only + '/' + s3object_psv_key_only
s3object_psv_crawler = s3object_base_folder + '_psv'
glue_role = 'reducted'
processed_immut_db = 'reducted'
#Instantiate s3 client.
s3client = boto3.client(
's3',
region_name = s3region
)
#Instantiate s3 resource.
s3resource = boto3.resource(
's3',
region_name = s3region
)
#Store raw object metadata as a dictionary variable.
s3object_raw_dict = {
'Bucket': s3bucket_nm,
'Key': s3object_inbound_key
}
#Create raw file object.
s3object_i = s3client.get_object(
Bucket = s3bucket_nm,
Key = s3object_raw_folder_only + '/' + s3object_raw_key_only
)
#Initialize the list to hold the raw file data string.
l_data = []
#Load s_data string into a list and transform.
for line in (''.join((s3object_i['Body'].read()).decode('utf-8'))).splitlines():
#Once the line with the beginning of the field list tag is reached, re-initialize the list.
if line.startswith('START-OF-FIELDS'):
l_data = []
#Load (append) the input file into the list.
l_data.append(line + '\n')
#Once the line with the end of the field list tag is reached, remove the field metadata tags.
if line.startswith('END-OF-FIELDS'):
#Remove the blank lines.
l_data=[line for line in l_data if '\n' != line]
#Remove lines with #.
l_data=[line for line in l_data if '#' not in line]
#Remove the tags signifying the the start and end of the field list.
l_data.remove('START-OF-FIELDS\n')
l_data.remove('END-OF-FIELDS\n')
#Remove the new line characters (\n) from each field name (assuming the last character in each element).
l_data=list(map(lambda i: i[:-1], l_data))
#Insert "missing" field names in the beginning of the header.
l_data.insert(0, 'BB_FILE_DT')
l_data.insert(1, 'BB_ID')
l_data.insert(2, 'RETURN_CD')
l_data.insert(3, 'NO_OF_FIELDS')
#Add | delimiter to each field.
l_data=[each + "|" for each in l_data]
#Concatenate all header elements into a single element.
l_data = [''.join(l_data[:])]
#Once the line with the end of data dataset tag is reached, remove the dataset metadata tags.
if line.startswith('END-OF-FILE'):
#Remove TIMESTARTED metadata.
l_data=[line for line in l_data if 'TIMESTARTED' not in line]
#Remove lines with #.
l_data=[line for line in l_data if '#' not in line]
#Remove the tags signifying the the start and end of the dataset.
l_data.remove('START-OF-DATA\n')
l_data.remove('END-OF-DATA\n')
#Remove DATARECORDS metadata.
l_data=[line for line in l_data if 'DATARECORDS' not in line]
#Remove TIMEFINISHED metadata.
l_data=[line for line in l_data if 'TIMEFINISHED' not in line]
#Remove END-OF-FILE metadata.
l_data=[line for line in l_data if 'END-OF-FILE' not in line]
#Store the file header into a variable.
l_data_header=l_data[0][:-1] + '\n'
#Add the column with the name of the inbound file to all elements of the file body.
l_data_body=[s3object_inbound_key_only[-8:] + '|' + line[:-2] + '\n' for line in l_data[2:]]
#Combine the file header and file body into a single list.
l_data_body.insert(0, l_data_header)
#Load the transformed list into a string variable.
s3object_o_data = ''.join(l_data_body)
#Write the transformed list from a string variable to a new s3 object.
s3resource.Object(s3bucket_nm, s3object_psv_folder_only + '/' + s3object_psv_key_only).put(Body=s3object_o_data)
Я определил, что «MemoryError» вызвана строкой кода ниже. s3object_i_data_decoded содержит файл размером 2 ГБ, о котором я упоминал ранее. Общий объем памяти, занятый процессом python до выполнения этой строки кода, составляет 2,025 ГБ. Похоже, использование памяти резко увеличивается после выполнения этой строки кода:
#Load the transformed list into a string variable.
s3object_o_data = ''.join(l_data_body)
После измерения объема памяти процесса во время выполнения кода я обнаружил, что всякий раз, когда переменная списка загружается в другую переменную, величина памяти используется почти в четыре раза. Таким образом, переменная списка 2 ГБ при назначении другой переменной приводит к увеличению объема памяти процесса до 6+ ГБ. : /
Я также предполагаю, что Glue Python Работа с оболочкой имеет трудности с обработкой файлов, размер которых превышает 2 ГБ ... Кто-нибудь может подтвердить это?
- Кто-нибудь еще сталкивался с этой ошибкой, когда Обработка файлов размером более 2 ГБ?
- Можно ли выполнить какие-либо настройки, чтобы избежать этого "MemoryError"?
- Не слишком ли велики наборы данных объемом 2 ГБ для Glue Python Shell Job и, возможно, Glue Spark должна следует учитывать.
Я мог бы теоретически разделить работу на более мелкие партии с помощью самого кода, но хотел посмотреть, есть ли более низкие висячие фрукты.
Я бы очень хотел настроить существующую работу и избегать использования Glue Spark, если в этом нет необходимости.
Заранее всем спасибо за то, что поделились своими идеями! :)