Разработка внешнего шаблона снежинки из значений схемы из S3 с использованием PySpark - PullRequest
0 голосов
/ 13 октября 2019

У меня есть ситуация, когда мне нужно выбрать файл из папки s3 по папке, загрузить схему и подготовить шаблон sql из того же. Проблема в том, как использовать значения, которые поступают в виде метаданных из фрейма данных.

import sys
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import boto3              
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
JasonPath= "s3://a**-*******-tmt-export/tmtgpi_wfmr_pos_copy_only/tmtgpi_wfmr_pos_copy_only_dbo_address/run-1570127171098-part-r-00001"
df = spark.read.csv(JasonPath, header=True, sep=",", inferSchema="true")
df.printSchema()
root
 |-- address_id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- address_mnemonic: string (nullable = true)
 |-- address_line_1: string (nullable = true)
 |-- address_line_2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- postal_code: integer (nullable = true)
 |-- last_modified_user_id: integer (nullable = true)
 |-- last_modified_timestamp: timestamp (nullable = true)
 |-- home_phone: string (nullable = true)
 |-- work_phone: string (nullable = true)
 |-- cell_phone: string (nullable = true)
 |-- pager: string (nullable = true)
 |-- e_mail: string (nullable = true)
 |-- fax_number: string (nullable = true)
 |-- data_guid: string (nullable = true)
 |-- auto_po_fax_number: string (nullable = true)
 |-- county: string (nullable = true)
`

Теперь мне нужно использовать эти значения в printSchema и сопоставить их с шаблоном sql, который inturn генерируется как файл .sql

OutPut: (файл .sql)

Тип данных и его имя автоматически выбирают значения схемы формы.


    USE ROLE SYSADMIN;
    USE WAREHOUSE TMT_LOAD_WH;
    -- Define Database
    USE SCHEMA AAP_TMT_DEV_DB.PUBLIC;
    -- Create Data File Format Rules
    CREATE FILE FORMAT IF NOT EXISTS AAP_TMT_DEV_DB.PUBLIC.CSV TYPE = 'CSV' SKIP_HEADER= 1 COMPRESSION = 'AUTO';
    CREATE  EXTERNAL TABLE IF NOT EXISTS TMT_GPL_WFMR_POS_COPY_ONLY
    (address_id INTEGER AS (value:c1::INTEGER),
    client_id INTEGER AS (value:c11::INTEGER),
    name STRING AS (value:c12::STRING),
    address_mnemonic STRING AS (value:c13::STRING),
    address_line_1 STRING AS (value:c14::STRING),
    address_line_2 STRING AS (value:c15::STRING),
    city STRING AS (value:c16::STRING),
    state_code INTEGER AS (value:c17::INTEGER),
    country_code INTEGER AS (value:c18::INTEGER),
    postal_code STRING AS (value:c19::STRING),
    last_modified_use STRING AS (value:c2::STRING)
    )
    WITH LOCATION =@S3_TMT_DEV_STAGE1/tmtgpi_wfmr_pos_copy_only/tmtgpi_wfmr_pos_copy_only_dbo_address
    FILE_FORMAT = CSV auto_refresh = true;
    alter external table TMT_GPL_WFMR_POS_COPY_ONLY refresh;


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...