Преобразование представления БД Oracle в Spark SQL - PullRequest
0 голосов
/ 18 декабря 2018

У меня проблемы с переводом представления БД Oracle в Spark SQL, работающий на AWS Glue.Исходное представление Oracle:

CREATE OR REPLACE FORCE VIEW "DMS_B2B"."OPERATION_CODE_VW" ("SECTION_NUMBER", "PAGE_NUMBER", "OPERATION_CODE", "COMBINATION_CODE", "RECORD_TYPE", "OPERATION_DESCRIPTION", "REPAIR_ABBREVIATION", "TYPE_INDICATOR_CODE", "CATEGORY_CODE", "ACTION_INDICATOR", "VDS_CODE", "HOURS", "MULTIPLE_HOURS_INDICATOR", "WMI_CODE", "BRAND_IND", "DET_ACTION_INDICATOR", "CREATED_DT") AS 
  SELECT   section_number, page_number, a.operation_code, b.combination_code,
            record_type, operation_description, repair_abbreviation,
            type_indicator_code, category_code,
            NVL ((SELECT action_indicator
                    FROM stg_operation_code b
                   WHERE b.operation_code = a.operation_code
                     AND b.type_indicator_code = a.type_indicator_code
                     AND b.combination_code IS NULL),
                 a.action_indicator
                ) AS action_indicator,
            vds_code, hours, multiple_hours_indicator, wmi_code, brand_ind,
            b.action_indicator det_action_indicator, a.created_dt
       FROM (SELECT NVL (combination_code, '|') AS modcomb, tabb01.*
               FROM stg_operation_code tabb01) a,
            (SELECT NVL (combination_code, '|') AS mobcomb, tabb02.*
               FROM stg_operation_code_details tabb02) b
      WHERE a.operation_code = b.operation_code(+)
        AND a.modcomb = b.mobcomb(+)
        AND record_type = '3'
        AND (   DECODE (type_indicator_code, 'B', 'T') = brand_ind
             OR DECODE (type_indicator_code, 'B', 'L') = brand_ind
             OR type_indicator_code = brand_ind
            )
   ORDER BY section_number,
            page_number,
            a.operation_code,
            b.combination_code DESC,
            vds_code,
            wmi_code ;

Мой код PySpark SQL:

OpCodeOpCodeDetailsFullToyDF_fs = spark.sql("""
             SELECT a.SECTION_NUMBER, a.PAGE_NUMBER, a.OPERATION_CODE, b.COMBINATION_CODE, a.RECORD_TYPE,
                    a.OPERATION_DESCRIPTION, a.REPAIR_ABBREVIATION, a.TYPE_INDICATOR_CODE, a.CATEGORY_CODE,
                    CASE WHEN (SELECT FIRST(ACTION_INDICATOR) FROM OpCodeFullView b
                               WHERE OpCodeFullView.OPERATION_CODE = a.OPERATION_CODE
                               AND   OpCodeFullView.TYPE_INDICATOR_CODE = a.TYPE_INDICATOR_CODE
                               AND   OpCodeFullView.COMBINATION_CODE IS NULL) IS NULL THEN a.ACTION_INDICATOR ELSE (SELECT FIRST(ACTION_INDICATOR) FROM OpCodeFullView
                                                                                                       WHERE OpCodeFullView.OPERATION_CODE = a.OPERATION_CODE
                                                                                                       AND   OpCodeFullView.TYPE_INDICATOR_CODE = a.TYPE_INDICATOR_CODE
                                                                                                       AND   OpCodeFullView.COMBINATION_CODE IS NULL) END AS ACTION_INDICATOR,
                    b.VDS_CODE, b.HOURS, b.MULTIPLE_HOURS_INDICATOR, b.WMI_CODE, b.BRAND_IND, b.ACTION_INDICATOR AS DET_ACTION_INDICATOR,
                    a.CREATED_DT AS UTC_TIME
             FROM  (SELECT (CASE WHEN COMBINATION_CODE IS NOT NULL THEN COMBINATION_CODE ELSE '|' END) AS modcomb, tabb01.* FROM OpCodeFullView) a,
                   (SELECT (CASE WHEN COMBINATION_CODE IS NOT NULL THEN COMBINATION_CODE ELSE '|' END) AS mobcomb, tabb02.* FROM OpCodeDetailFullToyView) b
             WHERE a.OPERATION_CODE = b.OPERATION_CODE
             AND RECORD_TYPE = '3'
             AND (CASE WHEN ((BRAND_IND == 'B') OR (BRAND_IND == 'T')
                          or (TYPE_INDICATOR_CODE == BRAND_IND)) THEN TRUE ELSE FALSE END)
             ORDER BY a.SECTION_NUMBER, a.PAGE_NUMBER, a.OPERATION_CODE, b.COMBINATION_CODE, B.VDS_CODE, B.WMI_CODE""")

Компилятор жалуется на разрешение a.OPERATION_CODE.Это код ошибки:

pyspark.sql.utils.AnalysisException: u"cannot resolve '`a.OPERATION_CODE`' given input columns: [CASE WHEN (COMBINATION_CODE IS NOT NULL) THEN COMBINATION_CODE ELSE | END, CASE WHEN (COMBINATION_CODE IS NOT NULL) THEN COMBINATION_CODE ELSE | END]; line 15 pos 19;\n'Sort ['a.SECTION_NUMBER ASC NULLS FIRST, 'a.PAGE_NUMBER ASC NULLS FIRST, 'a.OPERATION_CODE ASC NULLS FIRST, 'b.COMBINATION_CODE ASC NULLS FIRST, 'B.VDS_CODE ASC NULLS FIRST, 'B.WMI_CODE ASC NULLS FIRST], true\n+- 'Project ['a.SECTION_NUMBER, 'a.PAGE_NUMBER, 'a.OPERATION_CODE, 'b.COMBINATION_CODE, 'a.RECORD_TYPE, 'a.OPERATION_DESCRIPTION, 'a.REPAIR_ABBREVIATION, 'a.TYPE_INDICATOR_CODE, 'a.CATEGORY_CODE, CASE WHEN isnull(scalar-subquery#383 []) THEN 'a.ACTION_INDICATOR ELSE scalar-subquery#385 [] END AS ACTION_INDICATOR#386, 'b.VDS_CODE, 'b.HOURS, 'b.MULTIPLE_HOURS_INDICATOR, 'b.WMI_CODE, 'b.BRAND_IND, 'b.ACTION_INDICATOR AS DET_ACTION_INDICATOR#387, 'a.CREATED_DT AS UTC_TIME#388]\n : :- 'Project [unresolvedalias(first('ACTION_INDICATOR, false), None)]\n : : +- 'Filter ((('OpCodeFullView.OPERATION_CODE = 'a.OPERATION_CODE) && ('OpCodeFullView.TYPE_INDICATOR_CODE = 'a.TYPE_INDICATOR_CODE)) && isnull('OpCodeFullView.COMBINATION_CODE))\n : : +- 'SubqueryAlias b\n : : +- 'UnresolvedRelation `OpCodeFullView`\n : +- 'Project [unresolvedalias(first('ACTION_INDICATOR, false), None)]\n : +- 'Filter ((('OpCodeFullView.OPERATION_CODE = 'a.OPERATION_CODE) && ('OpCodeFullView.TYPE_INDICATOR_CODE = 'a.TYPE_INDICATOR_CODE)) && isnull('OpCodeFullView.COMBINATION_CODE))\n : +- 'UnresolvedRelation `OpCodeFullView`\n +- 'Filter ((('a.OPERATION_CODE = 'b.OPERATION_CODE) && ('RECORD_TYPE = 3)) && CASE WHEN ((('BRAND_IND = B) || ('BRAND_IND = T)) || ('TYPE_INDICATOR_CODE = 'BRAND_IND)) THEN true ELSE false END)\n +- Join Inner\n :- SubqueryAlias a\n : +- Project [CASE WHEN isnotnull(COMBINATION_CODE#138) THEN COMBINATION_CODE#138 ELSE | END AS CASE WHEN (COMBINATION_CODE IS NOT NULL) THEN COMBINATION_CODE ELSE | END#389]\n : +- SubqueryAlias opcodefullview\n : +- Project [SECTION_NUMBER#135, PAGE_NUMBER#136, OPERATION_CODE#137, COMBINATION_CODE#138, RECORD_TYPE#139, OPERATION_DESCRIPTION#140, REPAIR_ABBREVIATION#141, TYPE_INDICATOR_CODE#142, CATEGORY_CODE#143, ACTION_INDICATOR#144, CREATED_DT#145, CHANGE_STATUS_DESC#146, REQUEST_CODE#147, LABOROPERATIONDESCRIPTION#148, <lambda>() AS guid#164]\n : +- Project [rtrim(ltrim(sectionno#111)) AS SECTION_NUMBER#135, rtrim(ltrim(pageno#112)) AS PAGE_NUMBER#136, rtrim(ltrim(operationcode#113)) AS OPERATION_CODE#137, rtrim(ltrim(combinationcode#114)) AS COMBINATION_CODE#138, recordtype#115 AS RECORD_TYPE#139, rtrim(ltrim(description#116)) AS OPERATION_DESCRIPTION#140, rtrim(ltrim(repairabbr#117)) AS REPAIR_ABBREVIATION#141, typeindicatorcode#118 AS TYPE_INDICATOR_CODE#142, rtrim(ltrim(categorycode#119)) AS CATEGORY_CODE#143, actionindicator#120 AS ACTION_INDICATOR#144, current_timestamp() AS CREATED_DT#145, Complete AS CHANGE_STATUS_DESC#146, Complete AS REQUEST_CODE#147, CASE WHEN isnull(rtrim(ltrim(combinationcode#114))) THEN description#116 ELSE cast(null as string) END AS LABOROPERATIONDESCRIPTION#148]\n : +- SubqueryAlias ocftemptable\n : +- LogicalRDD [sectionno#111, pageno#112, operationcode#113, combinationcode#114, recordtype#115, description#116, repairabbr#117, typeindicatorcode#118, categorycode#119, actionindicator#120, filler#121]\n +- SubqueryAlias b\n +- Project [CASE WHEN isnotnull(COMBINATION_CODE#241) THEN COMBINATION_CODE#241 ELSE | END AS CASE WHEN (COMBINATION_CODE IS NOT NULL) THEN COMBINATION_CODE ELSE | END#390]\n +- SubqueryAlias opcodedetailfulltoyview\n +- Project [OPERATION_CODE#240, COMBINATION_CODE#241, VDS_CODE#242, HOURS#243, MULTIPLE_HOURS_INDICATOR#244, WMI_CODE#245, ACTION_INDICATOR#246, BRAND_IND#247, DOCUMENTID#248, <lambda>() AS guid#259]\n +- Project [rtrim(ltrim(operationcode#222)) AS OPERATION_CODE#240, rtrim(ltrim(combinationcode#223)) AS COMBINATION_CODE#241, rtrim(ltrim(vdscode#224)) AS VDS_CODE#242, rtrim(ltrim(cast(hours#225 as string))) AS HOURS#243, CASE WHEN ((rtrim(ltrim(cast(multiplehoursindicator#226 as string))) = Y) || isnull(rtrim(ltrim(cast(multiplehoursindicator#226 as string))))) THEN true ELSE false END AS MULTIPLE_HOURS_INDICATOR#244, rtrim(ltrim(wmicode#227)) AS WMI_CODE#245, actionindicator#228 AS ACTION_INDICATOR#246, T AS BRAND_IND#247, concat(concat(ShowOperationCode, from_unixtime(unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss, Some(Zulu)), yyyyMMddhhmmss, Some(Zulu))), TY) AS DOCUMENTID#248]\n +- SubqueryAlias ocdfttemptable\n +- LogicalRDD [operationcode#222, combinationcode#223, vdscode#224, hours#225, multiplehoursindicator#226, wmicode#227, actionindicator#228, filler#229]\n"

Я ценю помощь, которую вы можете оказать мне.

...