Я пытаюсь использовать инструмент AWS Glue ETL для анализа файла CSV в Parquet. Я следую этому учебнику с сайта Amazon.
Поскольку у меня есть куча CSV-файлов в корзине S3, я сохраняю их содержимое сжатым как GZIP. Я использую следующую лямбда-функцию для загрузки данных из zip-файла, извлечения CSV-файлов и сохранения их в виде сжатого CSV-файла в моем хранилище S3:
import json
import boto3
from botocore.vendored import requests
import zipfile
from gzip import GzipFile
from io import BytesIO
def lambda_handler(event, context):
s3 = boto3.resource('s3')
bucket_name = "my-bucket"
file_url = 'http://dados.cvm.gov.br/dados/FI/DOC/CDA/DADOS/'
file_name = "cda_fi_201801"
req = requests.get(file_url + file_name+".zip", stream=True)
data = req.raw.read()
zf = zipfile.ZipFile(BytesIO(data))
for fn in zf.namelist():
bytes = zf.read(fn).decode("windows-1252")
print ('File:', fn)
print ('has',len(bytes),'bytes')
# Choose folder name to put csv file
parts = fn.split("_")
folder = "PL"
if(parts[2] == "BLC"):
folder = "BLC_"+parts[3]
# BytesIO to not save to disk
gz_body = BytesIO()
gz = GzipFile(None, 'wb', 9, gz_body)
# Write csv bytes to gzip body
gz.write(bytes.encode('utf8'))
s3.Bucket(bucket_name).put_object(Key=folder + "/" + fn,
ContentType="text/plain",
ContentEncoding='gzip',
Body=gz_body.getvalue())
gz.close()
return {
'statusCode': 200,
}
Запустив AWS Glue Crawler в папке blc_1, я получил следующие свойства таблицы:
{
"StorageDescriptor": {
"cols": {
"FieldSchema": [
{
"name": "tp_fundo",
"type": "string",
"comment": ""
},
{
"name": "cnpj_fundo",
"type": "string",
"comment": ""
},
{
"name": "denom_social",
"type": "string",
"comment": ""
},
{
"name": "dt_comptc",
"type": "string",
"comment": ""
},
{
"name": "tp_aplic",
"type": "string",
"comment": ""
},
{
"name": "tp_ativo",
"type": "string",
"comment": ""
},
{
"name": "emissor_ligado",
"type": "string",
"comment": ""
},
{
"name": "tp_negoc",
"type": "string",
"comment": ""
},
{
"name": "qt_venda_negoc",
"type": "double",
"comment": ""
},
{
"name": "vl_venda_negoc",
"type": "double",
"comment": ""
},
{
"name": "qt_aquis_negoc",
"type": "double",
"comment": ""
},
{
"name": "vl_aquis_negoc",
"type": "double",
"comment": ""
},
{
"name": "qt_pos_final",
"type": "double",
"comment": ""
},
{
"name": "vl_merc_pos_final",
"type": "double",
"comment": ""
},
{
"name": "vl_custo_pos_final",
"type": "string",
"comment": ""
},
{
"name": "dt_confid_aplic",
"type": "string",
"comment": ""
},
{
"name": "tp_titpub",
"type": "string",
"comment": ""
},
{
"name": "cd_isin",
"type": "string",
"comment": ""
},
{
"name": "cd_selic",
"type": "bigint",
"comment": ""
},
{
"name": "dt_emissao",
"type": "string",
"comment": ""
},
{
"name": "dt_venc",
"type": "string",
"comment": ""
}
]
},
"location": "s3://my-bucket/BLC_1/",
"inputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"outputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed": "true",
"numBuckets": "-1",
"SerDeInfo": {
"name": "",
"serializationLib": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters": {
"field.delim": ";"
}
},
"bucketCols": [],
"sortCols": [],
"parameters": {
"skip.header.line.count": "1",
"sizeKey": "731056",
"objectCount": "1",
"UPDATED_BY_CRAWLER": "blc-1",
"CrawlerSchemaSerializerVersion": "1.0",
"recordCount": "1884",
"averageRecordSize": "258",
"CrawlerSchemaDeserializerVersion": "1.0",
"compressionType": "gzip",
"classification": "csv",
"columnsOrdered": "true",
"areColumnsQuoted": "false",
"delimiter": ";",
"typeOfData": "file"
},
"SkewedInfo": {},
"storedAsSubDirectories": "false"
},
"parameters": {
"skip.header.line.count": "1",
"sizeKey": "731056",
"objectCount": "1",
"UPDATED_BY_CRAWLER": "blc-1",
"CrawlerSchemaSerializerVersion": "1.0",
"recordCount": "1884",
"averageRecordSize": "258",
"CrawlerSchemaDeserializerVersion": "1.0",
"compressionType": "gzip",
"classification": "csv",
"columnsOrdered": "true",
"areColumnsQuoted": "false",
"delimiter": ";",
"typeOfData": "file"
}
}
После этого я последовал руководству и попытался запустить задание ETL с помощью следующего скрипта, автоматически сгенерированного Glue:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "cvm", table_name = "blc_1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cvm", table_name = "blc_1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("tp_fundo", "string", "tp_fundo", "string"), ("cnpj_fundo", "string", "cnpj_fundo", "string"), ("denom_social", "string", "denom_social", "string"), ("dt_comptc", "string", "dt_comptc", "string"), ("tp_aplic", "string", "tp_aplic", "string"), ("tp_ativo", "string", "tp_ativo", "string"), ("emissor_ligado", "string", "emissor_ligado", "string"), ("tp_negoc", "string", "tp_negoc", "string"), ("qt_venda_negoc", "double", "qt_venda_negoc", "double"), ("vl_venda_negoc", "double", "vl_venda_negoc", "double"), ("qt_aquis_negoc", "double", "qt_aquis_negoc", "double"), ("vl_aquis_negoc", "double", "vl_aquis_negoc", "double"), ("qt_pos_final", "double", "qt_pos_final", "double"), ("vl_merc_pos_final", "double", "vl_merc_pos_final", "double"), ("vl_custo_pos_final", "string", "vl_custo_pos_final", "string"), ("dt_confid_aplic", "string", "dt_confid_aplic", "string"), ("tp_titpub", "string", "tp_titpub", "string"), ("cd_isin", "string", "cd_isin", "string"), ("cd_selic", "long", "cd_selic", "long"), ("dt_emissao", "string", "dt_emissao", "string"), ("dt_venc", "string", "dt_venc", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("tp_fundo", "string", "tp_fundo", "string"), ("cnpj_fundo", "string", "cnpj_fundo", "string"), ("denom_social", "string", "denom_social", "string"), ("dt_comptc", "string", "dt_comptc", "string"), ("tp_aplic", "string", "tp_aplic", "string"), ("tp_ativo", "string", "tp_ativo", "string"), ("emissor_ligado", "string", "emissor_ligado", "string"), ("tp_negoc", "string", "tp_negoc", "string"), ("qt_venda_negoc", "double", "qt_venda_negoc", "double"), ("vl_venda_negoc", "double", "vl_venda_negoc", "double"), ("qt_aquis_negoc", "double", "qt_aquis_negoc", "double"), ("vl_aquis_negoc", "double", "vl_aquis_negoc", "double"), ("qt_pos_final", "double", "qt_pos_final", "double"), ("vl_merc_pos_final", "double", "vl_merc_pos_final", "double"), ("vl_custo_pos_final", "string", "vl_custo_pos_final", "string"), ("dt_confid_aplic", "string", "dt_confid_aplic", "string"), ("tp_titpub", "string", "tp_titpub", "string"), ("cd_isin", "string", "cd_isin", "string"), ("cd_selic", "long", "cd_selic", "long"), ("dt_emissao", "string", "dt_emissao", "string"), ("dt_venc", "string", "dt_venc", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://my-bucket/blc_1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://my-bucket/blc_1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
Я получил следующую ошибку:
19/03/27 19:10:07 WARN TaskSetManager: потерянное задание 0.0 на этапе 0.0 (TID
0, ip-172-32-89-229.us-east-2.compute.internal, исполнитель 1):
com.amazonaws.services.glue.util.FatalException: невозможно проанализировать файл:
cda_fi_BLC_1_201801.csv
в
com.amazonaws.services.glue.readers.JacksonReader.hasNextFailSafe (JacksonReader.scala: 94)
в
com.amazonaws.services.glue.readers.JacksonReader.hasNext (JacksonReader.scala: 38)
в
com.amazonaws.services.glue.hadoop.TapeHadoopRecordReader.nextKeyValue (TapeHadoopRecordReader.scala: 63)
в
org.apache.spark.rdd.NewHadoopRDD $$ Анон $ 1.hasNext (NewHadoopRDD.scala: 207)
в
org.apache.spark.InterruptibleIterator.hasNext (InterruptibleIterator.scala: 37)
на scala.collection.Iterator $$ anon $ 11.hasNext (Iterator.scala: 408) на
scala.collection.Iterator $$ anon $ 11.hasNext (Iterator.scala: 408) в
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write (BypassMergeSortShuffleWriter.java:148)
в
org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 96)
в org.apache.spark.scheduler.ShuffleMapTask.
К сожалению, он не дает ни малейшего представления о том, что может происходить. Я смог выполнить ETL, когда содержимое CSV не было сжато, поэтому я предпочитаю, что я что-то делаю неправильно при сжатии gzip или отсутствует какая-либо конфигурация.
Если у вас есть представление о том, что происходит, я ценю некоторую помощь.
Спасибо!