При использовании Relationalize в Glue в корневой таблице отсутствует идентификатор - PullRequest
0 голосов
/ 27 сентября 2018

У меня есть DynamicFrame in Glue, и я использую метод Relationalize, который создает мне 3 новых динамических кадра;root_table, root_table_1 и root_table_2.

Когда я печатаю схему таблиц или после того, как я вставил таблицы в базу данных, я заметил, что в root_table идентификатор отсутствует, поэтому я не могу сделатьсоединения между root_table и другими таблицами.

Я перепробовал все возможные комбинации.

Есть ли что-то, что я пропустил?

    datasource1 = Relationalize.apply(frame = renameId, name = "root_ds", transformation_ctx = "datasource1")
print(datasource1.keys())
print(datasource1.values())
for df_name in datasource1.keys():
    m_df = datasource1.select(df_name)
    print "Writing to Redshift table: ", df_name
    m_df.printSchema()

    glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, catalog_connection = "Redshift", connection_options = {"database" : "redshift", "dbtable" : df_name}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "df_to_db")

Ответы [ 2 ]

0 голосов
/ 09 октября 2018

Я использовал приведенный ниже код (удаляя биты импорта) в ваших данных и получил запись в S3.Я получил два файла, вставленные после кода.Я читаю из каталога клея после запуска сканера ваших данных.

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "json_aws_glue_relationalize_stackoverflow", transformation_ctx = "datasource0")

dfc = datasource0.relationalize("advertise_root", "s3://aws-glue-temporary-009551040880-ap-southeast-2/")

for df_name in dfc.keys():
    m_df = dfc.select(df_name)
    print "Writing to S3 file: ", df_name
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = m_df, connection_type = "s3", connection_options = {"path": "s3://aws-glue-relationalize-stackoverflow/" + df_name +"/"}, format = "csv", transformation_ctx = "datasink2")

job.commit()

основной стол, DECLINEREASON, идентификатор, ipHash, lapseTime, oldCommissionAmount, oldSaleAmount, orderRef, originalSaleAmount, paidToPublisher, PaymentID, publisherId, publisherUrl, saleAmount.amount, saleAmount.currency, SiteName, transactionDate, transactionDevice, transactionParts, transactionQueryId, тип, URL, validationDate, voucherCode, voucherCodeUsed, partition_0 AT, 123456, false, 2018-09-05T16: 31: 00, iPhone, «asdsdedrfrgthyjukiloujhrdf45654565423212», www.website.at, 1.5, EUR, в ожидании, AT ,, 321547896, -27670654789123380,68,,,,, false, 0,654987,, 1,0, EUR, https://www.site.at,2018 -09-05T16: 32: 00, iPhone, 1,0, Lead, https://www.website.at,,,false,advertise

Другая таблица для частей транзакцииидентификатор, индекс "transactionParts.val.amount", "transactionParts.val.commissionAmount", "transactionParts.val.commissionGroupCode "," TransactionsParts.val.commissionGroupId "," TransactionsParts.val.commissionGroupName "1,0,1.0,1.5, LEAD, 654654, Lead

Склейка сгенерированного столбца первичного ключа с именем "actionParts" в базовой таблицеа идентификатор в таблице транзакций - это внешний ключ к этому столбцу.Как видите, исходный столбец идентификаторов сохранен как есть.

Можете ли вы попробовать код на ваших данных и посмотреть, работает ли он (изменив имя исходной таблицы в соответствии с вашим)?Попробуйте сначала написать в S3 как CSV, чтобы выяснить, работает ли это.Пожалуйста, дайте мне знать ваши выводы.

0 голосов
/ 01 октября 2018

Это весь код.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db1", 
table_name = "ct_5", transformation_ctx = "datasource0")

dropnullfields3 = DropNullFields.apply(frame = datasource0, transformation_ctx = "dropnullfields3")

renameId = RenameField.apply(frame = dropnullfields3, old_name = "id", new_name = "transaction_id", transformation_ctx = "renameId")

datasource1 = Relationalize.apply(frame = renameId, name = "ds", transformation_ctx = "datasource1")

for df_name in datasource1.keys():
m_df = datasource1.select(df_name)
print "Writing to Redshift table: ", df_name
m_df.printSchema()

glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, catalog_connection = "Redshift", connection_options = {"database" : "dbr", "dbtable" : table_name}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "df_to_db")

Вот запись данных:

{"advertiserCountry": "AT", "advertiserId": 123456, "mendReason ": null, "исправлено": false, "clickDate": "2018-09-05T16: 31: 00", "clickDevice": "iPhone", "clickRefs": {"clickRef2": "asdsdedrfrgthyjukiloujhrdf45654565423212", "clickRef6":"www.website.at"}, "CommissionAmount": {"amount": 1.5, "currency": "EUR"}, "CommissionSharingPublisherId": null, "CommissionStatus": "pending", "customParameters": null, "customerCountry ":" AT "," decreason ": null," id ": 321547896," ipHash ":" -27670654789123380 "," lapseTime ": 68," oldCommissionAmount ": null," oldSaleAmount ": null," orderRef ":null, "originalSaleAmount": null, paidToPublisher: false, "paymentId": 0, "publisherId": 654987, "publisherUrl": "", "saleAmount": {"amount": 1.0, "currency": "EUR"}," siteName ":" https://www.site.at", "TransactionsDate": "2018-09-05T16: 32: 00", "TransactionsDevice": "iPhone", "TransactionsParts": [{"amount": 1.0,«комиссияСумма ": 1.5," CommissionGroupCode ":" LEAD "," CommissionGroupId ": 654654," CommissionGroupName ":" Lead "}], "actionQueryId": 0, "type": "Lead", "url": "https://www.website.at", "validationDate": null, "voucherCode": null, "voucherCodeUsed": false},

...