Разбор вложенных файлов с помощью Pyspark Glue - PullRequest
1 голос
/ 06 мая 2020

У меня есть вложенный файл json с такой структурой:

    {
        "value": {
            "employee": {
                 "employeeid": "1234",
                "employeename": "ABCD",
                 "contactNumber": [
                            {
                                "type": "Work",

                                "phoneNumber": "1234567890"
                            },
                            {
                                "type": "Home",

                                "phoneNumber": "0987654321"
                            }
                        ] }}}

Мне нужно сгладить этот массив, присутствующий в json, и записать в таблицу в Amazon RDS с помощью клея. Я попытался создать следующий фрагмент:

 import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test", table_name = "employee", transformation_ctx = "datasource0")
    relationalized_json = datasource0.relationalize(root_table_name = "root", staging_path = args["TempDir"])
    root_df = relationalized_json.select('root')
    applymapping1 = ApplyMapping.apply(frame = root_df, mappings= [("`value.employee.employeeid`","string", "employeeid","string"),("`value.employee.employeename`","string", "employeename","string"), ("`value.employee.employeename.contactNumber.value.type`","string", "type","string"),("`value.employee.employeename.contactNumber.value.phoneNumber`","string", "phoneNumber","string")]transformation_ctx = "applymapping1")
    datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping1, catalog_connection = "test", connection_options = {"dbtable": "employee", "database": "testdb"}, transformation_ctx = "datasink1")

Он заполняет значения для всех столбцов, кроме типа и номера телефона. Тип и номер телефона заполняются пустыми. Сообщите мне, пожалуйста, о проблеме?

1 Ответ

0 голосов
/ 06 мая 2020

Я изменил код следующим образом:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql import functions as F
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "test", table_name = "taskjson_taskorderassignment_0429_2020_05_01_json", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test", table_name = "employee", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "string", "id", "string"), ("type", "string", "type", "string"), ("datacontenttype", "string", "datacontenttype", "string"), ("time", "string", "time", "string"), ("specversion", "string", "specversion", "string"), ("source", "string", "source", "string"), ("data.taskOrder.taskOrderIdentifier", "string", "`data.taskOrder.taskOrderIdentifier`", "string"), ("data.taskOrder.taskOrderTypeCode", "string", "`data.taskOrder.taskOrderTypeCode`", "string"), ("data.taskOrder.taskOrderRequestDateTime", "string", "`data.taskOrder.taskOrderRequestDateTime`", "string"), ("data.taskOrder.taskOrderRequestedPickupTime", "string", "`data.taskOrder.taskOrderRequestedPickupTime`", "string"), ("data.taskOrder.taskOrderRequestedDeliveryTime", "string", "`data.taskOrder.taskOrderRequestedDeliveryTime`", "string"), ("data.taskOrder.taskOrderRequestedCounselingDate", "string", "`data.taskOrder.taskOrderRequestedCounselingDate`", "string"), ("data.taskOrder.taskOrderPackagingDate", "string", "`data.taskOrder.taskOrderPackagingDate`", "string"), ("data.taskOrder.movePriorityCode", "string", "`data.taskOrder.movePriorityCode`", "string"), ("data.taskOrder.isSafeMove", "boolean", "`data.taskOrder.isSafeMove`", "boolean"), ("data.taskOrder.restrictionIndicator", "boolean", "`data.taskOrder.restrictionIndicator`", "boolean"), ("data.taskOrder.taskOrderRequester", "string", "`data.taskOrder.taskOrderRequester`", "string"), ("data.taskOrder.taskOrderApprovedbyUser", "string", "`data.taskOrder.taskOrderApprovedbyUser`", "string"), ("data.taskOrder.taskOrderApprovedDateTime", "string", "`data.taskOrder.taskOrderApprovedDateTime`", "string"), ("data.taskOrder.taskOrderStatus", "string", "`data.taskOrder.taskOrderStatus`", "string"), ("data.taskOrder.taskOrderNotes", "string", "`data.taskOrder.taskOrderNotes`", "string"), ("data.taskOrder.isVIP", "boolean", "`data.taskOrder.isVIP`", "boolean"), ("data.taskOrder.isBlueBark", "boolean", "`data.taskOrder.isBlueBark`", "boolean"), ("data.taskOrder.isEntitlementOrdered", "boolean", "`data.taskOrder.isEntitlementOrdered`", "boolean"), ("data.taskOrder.entitelmentData", "array", "`data.taskOrder.entitelmentData`", "string"), ("data.taskOrder.serviceMember.serviceMemberIdentifier", "string", "`data.taskOrder.serviceMember.serviceMemberIdentifier`", "string"), ("data.taskOrder.serviceMember.serviceMemberTitle", "string", "`data.taskOrder.serviceMember.serviceMemberTitle`", "string"), ("data.taskOrder.serviceMember.serviceMemberFirstName", "string", "`data.taskOrder.serviceMember.serviceMemberFirstName`", "string"), ("data.taskOrder.serviceMember.serviceMemberMiddleName", "string", "`data.taskOrder.serviceMember.serviceMemberMiddleName`", "string"), ("data.taskOrder.serviceMember.serviceMemberLastName", "string", "`data.taskOrder.serviceMember.serviceMemberLastName`", "string"), ("data.taskOrder.serviceMember.serviceMemberlastFourSSN", "string", "`data.taskOrder.serviceMember.serviceMemberlastFourSSN`", "string"), ("data.taskOrder.serviceMember.serviceMemberGender", "string", "`data.taskOrder.serviceMember.serviceMemberGender`", "string"), ("data.taskOrder.serviceMember.serviceUserDODIdentifier", "string", "`data.taskOrder.serviceMember.serviceUserDODIdentifier`", "string"), ("data.taskOrder.serviceMember.serviceMemberRank", "string", "`data.taskOrder.serviceMember.serviceMemberRank`", "string"), ("data.taskOrder.serviceMember.contactInfo.email", "string", "`data.taskOrder.serviceMember.contactInfo.email`", "string"), ("data.taskOrder.serviceMember.contactInfo.alternateEmail", "string", "`data.taskOrder.serviceMember.contactInfo.alternateEmail`", "string"), ("data.taskOrder.serviceMember.contactInfo.primaryCommunicationPref", "string", "`data.taskOrder.serviceMember.contactInfo.primaryCommunicationPref`", "string"), ("data.taskOrder.serviceMember.contactInfo.secondaryCommunicationPref", "string", "`data.taskOrder.serviceMember.contactInfo.secondaryCommunicationPref`", "string"), ("data.taskOrder.serviceMember.contactInfo.contactNumber", "array", "`data.taskOrder.serviceMember.contactInfo.contactNumber`", "string"), ("data.taskOrder.authorizedRepresentative.authRepIdentifier", "string", "`data.taskOrder.authorizedRepresentative.authRepIdentifier`", "string"), ("data.taskOrder.authorizedRepresentative.authRepTitle", "string", "`data.taskOrder.authorizedRepresentative.authRepTitle`", "string"), ("data.taskOrder.authorizedRepresentative.authRepFirstName", "string", "`data.taskOrder.authorizedRepresentative.authRepFirstName`", "string"), ("data.taskOrder.authorizedRepresentative.authRepMiddleName", "string", "`data.taskOrder.authorizedRepresentative.authRepMiddleName`", "string"), ("data.taskOrder.authorizedRepresentative.authRepLastName", "string", "`data.taskOrder.authorizedRepresentative.authRepLastName`", "string"), ("data.taskOrder.authorizedRepresentative.authRepEffectiveStartDate", "string", "`data.taskOrder.authorizedRepresentative.authRepEffectiveStartDate`", "string"), ("data.taskOrder.authorizedRepresentative.authRepEffectiveEndDate", "string", "`data.taskOrder.authorizedRepresentative.authRepEffectiveEndDate`", "string"), ("data.taskOrder.authorizedRepresentative.authRepGender", "string", "`data.taskOrder.authorizedRepresentative.authRepGender`", "string"), ("data.taskOrder.authorizedRepresentative.contactInfo.email", "string", "`data.taskOrder.authorizedRepresentative.contactInfo.email`", "string"), ("data.taskOrder.authorizedRepresentative.contactInfo.alternateEmail", "string", "`data.taskOrder.authorizedRepresentative.contactInfo.alternateEmail`", "string"), ("data.taskOrder.authorizedRepresentative.contactInfo.primaryCommunicationPref", "string", "`data.taskOrder.authorizedRepresentative.contactInfo.primaryCommunicationPref`", "string"), ("data.taskOrder.authorizedRepresentative.contactInfo.secondaryCommunicationPref", "string", "`data.taskOrder.authorizedRepresentative.contactInfo.secondaryCommunicationPref`", "string"), ("data.taskOrder.authorizedRepresentative.contactInfo.contactNumber", "array", "`data.taskOrder.authorizedRepresentative.contactInfo.contactNumber`", "string"), ("data.taskOrder.orderingOfficer.orderingOfficerIdentifier", "string", "`data.taskOrder.orderingOfficer.orderingOfficerIdentifier`", "string"), ("data.taskOrder.orderingOfficer.orderingOfficerTitle", "string", "`data.taskOrder.orderingOfficer.orderingOfficerTitle`", "string"), ("data.taskOrder.orderingOfficer.orderingOfficerFirstName", "string", "`data.taskOrder.orderingOfficer.orderingOfficerFirstName`", "string"), ("data.taskOrder.orderingOfficer.orderingOfficerMiddleName", "string", "`data.taskOrder.orderingOfficer.orderingOfficerMiddleName`", "string"), ("data.taskOrder.orderingOfficer.orderingOfficerLastName", "string", "`data.taskOrder.orderingOfficer.orderingOfficerLastName`", "string"), ("data.taskOrder.orderingOfficer.orderingOfficerGender", "string", "`data.taskOrder.orderingOfficer.orderingOfficerGender`", "string"), ("data.taskOrder.orderingOfficer.orderingOfficerDODIdentifier", "string", "`data.taskOrder.orderingOfficer.orderingOfficerDODIdentifier`", "string"), ("data.taskOrder.orderingOfficer.contactInfo.email", "string", "`data.taskOrder.orderingOfficer.contactInfo.email`", "string"), ("data.taskOrder.orderingOfficer.contactInfo.alternateEmail", "string", "`data.taskOrder.orderingOfficer.contactInfo.alternateEmail`", "string"), ("data.taskOrder.orderingOfficer.contactInfo.primaryCommunicationPref", "string", "`data.taskOrder.orderingOfficer.contactInfo.primaryCommunicationPref`", "string"), ("data.taskOrder.orderingOfficer.contactInfo.secondaryCommunicationPref", "string", "`data.taskOrder.orderingOfficer.contactInfo.secondaryCommunicationPref`", "string"), ("data.taskOrder.orderingOfficer.contactInfo.contactNumber", "array", "`data.taskOrder.orderingOfficer.contactInfo.contactNumber`", "string"), ("data.taskOrder.addressInfo.destinationAddress.addressLine1", "string", "`data.taskOrder.addressInfo.destinationAddress.addressLine1`", "string"), ("data.taskOrder.addressInfo.destinationAddress.addressLine2", "string", "`data.taskOrder.addressInfo.destinationAddress.addressLine2`", "string"), ("data.taskOrder.addressInfo.destinationAddress.addressLine3", "string", "`data.taskOrder.addressInfo.destinationAddress.addressLine3`", "string"), ("data.taskOrder.addressInfo.destinationAddress.longitude", "string", "`data.taskOrder.addressInfo.destinationAddress.longitude`", "string"), ("data.taskOrder.addressInfo.destinationAddress.latitude", "string", "`data.taskOrder.addressInfo.destinationAddress.latitude`", "string"), ("data.taskOrder.addressInfo.destinationAddress.city", "string", "`data.taskOrder.addressInfo.destinationAddress.city`", "string"), ("data.taskOrder.addressInfo.destinationAddress.county", "string", "`data.taskOrder.addressInfo.destinationAddress.county`", "string"), ("data.taskOrder.addressInfo.destinationAddress.stateOrProvince", "string", "`data.taskOrder.addressInfo.destinationAddress.stateOrProvince`", "string"), ("data.taskOrder.addressInfo.destinationAddress.postalCode", "string", "`data.taskOrder.addressInfo.destinationAddress.postalCode`", "string"), ("data.taskOrder.addressInfo.destinationAddress.countryCode", "string", "`data.taskOrder.addressInfo.destinationAddress.countryCode`", "string"), ("data.taskOrder.addressInfo.originAddress.addressLine1", "string", "`data.taskOrder.addressInfo.originAddress.addressLine1`", "string"), ("data.taskOrder.addressInfo.originAddress.addressLine2", "string", "`data.taskOrder.addressInfo.originAddress.addressLine2`", "string"), ("data.taskOrder.addressInfo.originAddress.addressLine3", "string", "`data.taskOrder.addressInfo.originAddress.addressLine3`", "string"), ("data.taskOrder.addressInfo.originAddress.longitude", "string", "`data.taskOrder.addressInfo.originAddress.longitude`", "string"), ("data.taskOrder.addressInfo.originAddress.latitude", "string", "`data.taskOrder.addressInfo.originAddress.latitude`", "string"), ("data.taskOrder.addressInfo.originAddress.city", "string", "`data.taskOrder.addressInfo.originAddress.city`", "string"), ("data.taskOrder.addressInfo.originAddress.county", "string", "`data.taskOrder.addressInfo.originAddress.county`", "string"), ("data.taskOrder.addressInfo.originAddress.stateOrProvince", "string", "`data.taskOrder.addressInfo.originAddress.stateOrProvince`", "string"), ("data.taskOrder.addressInfo.originAddress.postalCode", "string", "`data.taskOrder.addressInfo.originAddress.postalCode`", "string"), ("data.taskOrder.addressInfo.originAddress.countryCode", "string", "`data.taskOrder.addressInfo.originAddress.countryCode`", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]


df=datasource0.toDF()


schema = ArrayType(MapType(StringType(),StringType()))


root_df=df.select("value.employee.employeeid","value.employee.employeename",\
         "value.employee.contactNumber")\
  .withColumn("contactNumber",F.explode(F.from_json(F.regexp_replace\
                                          (F.regexp_replace("contactNumber","([\\w-]+)", "\"$1\"")\
                                           ,"\=",":"),schema)))\
  .select("employeeid","employeename","contactNumber.type","contactNumber.phoneNumber")\
  .show(truncate=False)



applymapping1 = ApplyMapping.apply(frame = root_df, mappings = [("`employeeid`", "string", "employeeid", "string"),
("`employeename`", "string", "employeename", "string"),
("`contactNumber.type`", "string", "type", "string"),
("`contactNumber.phoneNumber`", "string", "phone", "string"),

 ], transformation_ctx = "applymapping1") 



datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping1, catalog_connection = "test", connection_options = {"dbtable": "employee", "database": "testdb"}, transformation_ctx = "datasink1")

job.commit()

Но я получаю следующую ошибку:

pyspark. sql .utils.AnalysisException: 'не удается разрешить \' regexp_replace ( contactNumber, \ '([\\ w -] +) \', \ '"$ 1" \') \ 'из-за несоответствия типа данных: для аргумента 1 требуется строковый тип, однако \' contactNumber \ ' of array> type. ;; \ n \ 'Проект [taskOrderIdentifier # 14, explode (jsontostructs (ArrayType (MapType (StringType, StringType, true), true), true), regexp_replace (regexp_replace (contactNumber # 15, ([\ w -]] + ), "$ 1"), \ =, :), Some (UT C))) AS contactNumber.

Сообщите, пожалуйста, что здесь не так.

...