У меня есть задание AWS Glue Python, которое загружает данные из MySQL в файлы S3 после того, как они больше не требуются.
Закладки для этой работы отключены, однако я продолжаю получать
IllegalArgumentException: "ключи закладки задания Список (ip) не соответствует сохраненным ключам Set (id). Используйте ResetJobBookmark, чтобы очистить существующую закладку задания.
сообщение об ошибке.
Кто-нибудь сталкивался с подобным поведением клея AWS?
Подробные журналы:
Файл "/mnt/yarn/usercache/root/appcache/application_1572881176134_0001/container_1572881176134_0001_01_000001/py4jsrc.zip/py4j/protocol.py ", строка 328, в формате get_return_value (target_id,". ", name), value) py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o91.getDynamicFrame.: java.lang.IllegalArgumentException: ключи закладки задания Список (ip) не соответствует сохраненным ключам Установить (id). Используйте ResetJobBookmark, чтобы очистить существующую закладку задания.
Я сбросил задание Boomkmark для заданияОднако безуспешно.
Код, который я использую:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TABLES'])
#connection init
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#create a list from input argument
tables = args['TABLES'].split(",")
#loop through table list
for table in tables:
table = table.strip() #remove spaces
#connect to table
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "prod", table_name = ("prod_" + table), transformation_ctx = "datasource0")
#drop fields
dropnullfields3 = DropNullFields.apply(frame = datasource0, transformation_ctx = "dropnullfields3")
#only one output file
dropnullfields3 = dropnullfields3.repartition(1)
#set path
s3_path = "s3://aws-glue-output1/" + table
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": s3_path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()