AWS Glue Job возвращает ошибку при включении задания в качестве ON - PullRequest
0 голосов
/ 03 марта 2020

У нас есть требование для преобразования существующих исходных файлов S3, которые в настоящее время имеют формат JSON, в паркет. Ниже приведены шаги, предпринятые для достижения того же.

1) Создано задание GLUE с именем "RawEventsToParquet Msgstr ", который будет считывать данные из 'raw_events' (это таблица, которая в настоящее время существует с источником в виде файлов S3 JSON) и форматирует данные в Parquet и сохраняет в новую корзину S3" S3 / подготовленный / события ". 2) Создал сканер "crawlParquetPreparedFiles", который пролистывает вышеупомянутую корзину S3 и создает каталог "prepare_events" и таблицу "prepare_highbond_events". 3) Мы можем запросить Athena и прочитать данные с prepare_events.

Мы также хотим, чтобы JOB просто создавал файлы Parquet для новых файлов S3 JSON и не должен дублировать и загружать уже обработанные файлы, следовательно, после исследования нашел возможность включить предварительное свойство закладка Job. Он работает нормально при первом запуске и был в состоянии запросить данные из созданных файлов Parquet. Однако во втором запуске, в идеале, он не должен создавать никаких файлов паркета, так как у нас нет новых файлов, добавленных в исходный S3, но задание терпит неудачу с ошибкой ниже. Я новичок в AWS, поэтому мне трудно это исправить. Помощь очень ценится. Заранее спасибо.

AnalysisException: '\ nDatasource не поддерживает запись пустых или вложенных пустых схем. \ NПожалуйста, убедитесь, что в схеме данных есть хотя бы один или несколько столбцов. \ N;'

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "raw_events", table_name = "highbond_events", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw_events", table_name = "highbond_events", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("anonymousid", "string", "anonymousid", "string"), ("channel", "string", "channel", "string"), ("context", "struct", "context", "struct"), ("event", "string", "event", "string"), ("integrations", "string", "integrations", "string"), ("messageid", "string", "messageid", "string"), ("originaltimestamp", "string", "originaltimestamp", "string"), ("projectid", "string", "projectid", "string"), ("properties", "struct", "properties", "struct"), ("receivedat", "string", "receivedat", "string"), ("sentat", "string", "sentat", "string"), ("timestamp", "string", "timestamp", "string"), ("type", "string", "type", "string"), ("userid", "string", "userid", "string"), ("version", "int", "version", "int"), ("writekey", "string", "writekey", "string"), ("_metadata", "struct", "_metadata", "struct"), ("category", "string", "category", "string"), ("name", "string", "name", "string"), ("traits", "struct", "traits", "struct"), ("groupid", "string", "groupid", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("anonymousid", "string", "anonymousid", "string"), ("channel", "string", "channel", "string"), ("context", "struct", "context", "struct"), ("event", "string", "event", "string"), ("integrations", "string", "integrations", "string"), ("messageid", "string", "messageid", "string"), ("originaltimestamp", "string", "originaltimestamp", "string"), ("projectid", "string", "projectid", "string"), ("properties", "struct", "properties", "struct"), ("receivedat", "string", "receivedat", "string"), ("sentat", "string", "sentat", "string"), ("timestamp", "string", "timestamp", "string"), ("type", "string", "type", "string"), ("userid", "string", "userid", "string"), ("version", "int", "version", "int"), ("writekey", "string", "writekey", "string"), ("_metadata", "struct", "_metadata", "struct"), ("category", "string", "category", "string"), ("name", "string", "name", "string"), ("traits", "struct", "traits", "struct"), ("groupid", "string", "groupid", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://acl-playground-grc-cleansed-data/prepared/events"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://acl-playground-grc-cleansed-data/prepared/events"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
...