У меня есть ситуация, когда мне нужно выбрать файл из папки s3 по папке, загрузить схему и подготовить шаблон sql из того же. Проблема в том, как использовать значения, которые поступают в виде метаданных из фрейма данных.
import sys
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import boto3
spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
JasonPath= "s3://a**-*******-tmt-export/tmtgpi_wfmr_pos_copy_only/tmtgpi_wfmr_pos_copy_only_dbo_address/run-1570127171098-part-r-00001"
df = spark.read.csv(JasonPath, header=True, sep=",", inferSchema="true")
df.printSchema()
root
|-- address_id: integer (nullable = true)
|-- client_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- address_mnemonic: string (nullable = true)
|-- address_line_1: string (nullable = true)
|-- address_line_2: string (nullable = true)
|-- city: string (nullable = true)
|-- state_code: string (nullable = true)
|-- country_code: string (nullable = true)
|-- postal_code: integer (nullable = true)
|-- last_modified_user_id: integer (nullable = true)
|-- last_modified_timestamp: timestamp (nullable = true)
|-- home_phone: string (nullable = true)
|-- work_phone: string (nullable = true)
|-- cell_phone: string (nullable = true)
|-- pager: string (nullable = true)
|-- e_mail: string (nullable = true)
|-- fax_number: string (nullable = true)
|-- data_guid: string (nullable = true)
|-- auto_po_fax_number: string (nullable = true)
|-- county: string (nullable = true)
`
Теперь мне нужно использовать эти значения в printSchema и сопоставить их с шаблоном sql, который inturn генерируется как файл .sql
OutPut: (файл .sql)
Тип данных и его имя автоматически выбирают значения схемы формы.
USE ROLE SYSADMIN;
USE WAREHOUSE TMT_LOAD_WH;
-- Define Database
USE SCHEMA AAP_TMT_DEV_DB.PUBLIC;
-- Create Data File Format Rules
CREATE FILE FORMAT IF NOT EXISTS AAP_TMT_DEV_DB.PUBLIC.CSV TYPE = 'CSV' SKIP_HEADER= 1 COMPRESSION = 'AUTO';
CREATE EXTERNAL TABLE IF NOT EXISTS TMT_GPL_WFMR_POS_COPY_ONLY
(address_id INTEGER AS (value:c1::INTEGER),
client_id INTEGER AS (value:c11::INTEGER),
name STRING AS (value:c12::STRING),
address_mnemonic STRING AS (value:c13::STRING),
address_line_1 STRING AS (value:c14::STRING),
address_line_2 STRING AS (value:c15::STRING),
city STRING AS (value:c16::STRING),
state_code INTEGER AS (value:c17::INTEGER),
country_code INTEGER AS (value:c18::INTEGER),
postal_code STRING AS (value:c19::STRING),
last_modified_use STRING AS (value:c2::STRING)
)
WITH LOCATION =@S3_TMT_DEV_STAGE1/tmtgpi_wfmr_pos_copy_only/tmtgpi_wfmr_pos_copy_only_dbo_address
FILE_FORMAT = CSV auto_refresh = true;
alter external table TMT_GPL_WFMR_POS_COPY_ONLY refresh;