AWS Glue пишут только самые новые паркетные перегородки - PullRequest
0 голосов
/ 21 октября 2019

У меня есть клейкая база данных, в которой есть две таблицы, в каждой из которых одни и те же данные разделены по-разному. Я пытаюсь написать задание, которое выполняется по ночам, считывает данные из одной таблицы, а затем записывает новые данные с обновленными разделами. Я могу сделать это с помощью следующего кода:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "Database",
    table_name = "Table",
    transformation_ctx = "datasource0"
)

datasource0 = datasource0.toDF()

datasource0.write.partitionBy("Key1","Key2").parquet(OutputFilePath)

Но для этого потребуется записать весь фрейм данных. Я только хочу написать новые разделы, поэтому я нашел следующий фрагмент на веб-сайте AWS:

glue_context.write_dynamic_frame.from_options(
    frame = projectedEvents,
    connection_type = "s3",    
    connection_options = {"path": "$outpath", "partitionKeys": ["type"]},
    format = "parquet")

Но это просто переписывает также весь фрейм данных. Как я могу просто переписать самые новые разделы?

Ответы [ 2 ]

0 голосов
/ 31 октября 2019

Это можно сделать с помощью аргумента push_down_predicate. Первоначально данные были разделены по годам, месяцам, дням и часам, поэтому я просто вычел один день, а затем использовал push_down_predicate следующим образом:

timestamp = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
s1 = timestamp.split('-')

pdp = "partition_0 = " + s1[0] + " and partition_1 = " + s1[1] + " and partition_2 = " + s1[2]

datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database = "mailfiles_standardized", 
    table_name = "firehoseoutput", 
    push_down_predicate = pdp
)

glueContext.write_dynamic_frame.from_options(
frame = datasource2,
connection_type = "s3",
connection_options = {
    "path": Bucket, 
    "partitionKeys": ["Key1","Key2"]
    },
format = "parquet")
0 голосов
/ 22 октября 2019

Возможно, посмотрите на закладки, которые работают как механизм контрольных точек, чтобы избежать повторной обработки данных, которые были ранее обработаны: https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

...