Я обрабатываю паркет от S3 TSV до S3 с помощью клея AWS.Из-за входящих файлов, отличных от UTF-8, я вынужден использовать DataFrames вместо DynamicFrames для обработки моих данных (это известная проблема без обходных путей, что DynamicFrames полностью терпит неудачу с любыми не-UTF8 символами).Это также означает, что я не могу использовать Job Bookmarks в Glue, чтобы отслеживать, какие файлы S3 TSV я уже обработал.
Мой код выглядит так:
# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
from awsglue.dynamicframe import DynamicFrame
# @params: [JOB_NAME, s3target]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Define massive list of fields in the schema
fields = [
StructField("accept_language", StringType(), True),
StructField("browser", LongType(), True),
.... huge list ...
StructField("yearly_visitor", ShortType(), True),
StructField("zip", StringType(), True)
]
schema = StructType(fields)
# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(args['s3source'] + "/*.tsv.gz")
# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')
# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))
# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])
job.commit()
Мой вопрос- без закладок задания каждый раз при запуске он обрабатывает одни и те же файлы s3 снова и снова.Как я могу переместить обработанные файлы в исходной корзине s3 в подпапку или что-то в этом роде или иным образом избежать двойной обработки файлов?
Я не уверен, в чем тут хитрость, поскольку Spark является параллельной системой, идаже не зная, что это за файлы.Думаю, я мог бы создать второе задание Glue с типом задания Python Shell и сразу же удалить входящие файлы, но даже тогда я не уверен, какие файлы удалять и т. Д.
Спасибо,
Chris