Добавление столбца метки времени при импорте данных в красное смещение с помощью AWS Glue Job - PullRequest
0 голосов
/ 04 июня 2018

Я хотел бы знать, возможно ли добавить столбец отметки времени в таблицу, когда он загружается с помощью задания склеивания AWS.

Первый сценарий:

Столбец A|Колонка Б |TimeStamp

A | 2 | 2018-06-03 23: 59: 00.0

Когда Crawler обновляет таблицу в каталоге данных и снова запускает задание, таблица добавитновые данные в таблице с новой отметкой времени.

Столбец A |Колонка Б |Отметка времени

A | 4 | 2018-06-04 05: 01: 31,0

B | 8 | 2018-06-04 06: 02: 31,0

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ColumnA", "char", "ColumnA", "char"), ("ColumnB", "char", "ColumnB", "char")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "TESTDB", connection_options = {"dbtable": "TABLEA", "database": "anasightprd01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

Ответы [ 2 ]

0 голосов
/ 05 июня 2018

Преобразование DynamicFrame в DataFrame для spark, добавление нового столбца с текущей меткой времени, а затем преобразование его обратно в DynamicFrame перед записью.

import org.apache.spark.sql.functions._

...

val timestampedDf = dropnullfields3.toDF().withColumn("TimeStamp", current_timestamp())
val timestamped4 = DynamicFrame(timestampedDf, glueContext)

Вот как должен выглядеть ваш код Python:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext, DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import current_timestamp

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ColumnA", "char", "ColumnA", "char"), ("ColumnB", "char", "ColumnB", "char")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
# add TimeStamp column
timestampedDf = dropnullfields3.toDF().withColumn("TimeStamp", current_timestamp())
timestamped4 = DynamicFrame.fromDF(timestampedDf, glueContext, "timestampedDf")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = timestamped4, catalog_connection = "TESTDB", connection_options = {"dbtable": "TABLEA", "database": "anasightprd01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
0 голосов
/ 04 июня 2018

Хотя, вероятно, есть способ получить текущую дату и время в вашем коде клея, другой распространенный способ пометки данных о времени при вставке - это добавление столбца данных TIMESTAMP в таблицу Redshift, для которого значение по умолчанию * 1002.*GETDATE()

CREATE TABLE myschema.mytable
(
    ... OTHER Fields here
    insertedtimestamp TIMESTAMP WITH TIME ZONE DEFAULT(GETDATE())
);

Хитрость со вставкой заключается в том, чтобы столбец insertedtimestamp не был указан в полях INSERT INTO или COPY - при добавлении строк в таблицу

INSERT INTO myschema.mytable(Col1, Col2 ...) -- NB no `insertedtimestamp` column
VALUES ('col1', 'col2' ...);

- значение insertedtimestamp будет иметь автоматическую отметку времени

...