Не могли бы вы помочь мне по приведенному ниже случаю в коде pyspark 2.4.3 блока данных, который обрабатывает терабайты данных json, считанных из корзины aws s3.
case 1:
В настоящее время я читаю несколько таблиц последовательно, чтобы получить количество дней для каждой таблицы.
Например, в table_list.csv есть один столбец с несколькими именами таблиц
year = 2019 month = 12
tablesDF = spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
tabList = tablesDF.toPandas().values.tolist()
for table in tabList:
tab_name = table[0]
// Настройки снежинок и количество таблиц снежинок ()
sfOptions = dict(
"sfURL" -> "",
"sfAccount" -> "",
"sfUser" -> "",
"sfPassword" -> "",
"sfDatabase" -> "",
"sfSchema" -> "",
"sfWarehouse" -> "",
)
// Read data as dataframe
sfxdf = spark.read
.format("snowflake")
.options(**sfOptions)
.option("query", "select y as year,m as month,count(*) as sCount from
{} where y={} and m={} group by year,month").format(tab_name,year,month)
.load()
// блоки данных delta lake
dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from
db.{} where y={} and m={}" group by year,month).format(tab_name,year,month)
resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))
finalDF = resultDF.withColumn("table_name",
lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")
finalDF.coalesce(1).write.format('csv').option('header', 'true').save("s3a://outputs/reportcsv)
Вопросы:
1) В настоящее время я выполняю основанный на последовательности запуск с использованием запроса количества, принимая etables одним.
2) Как читать все таблицы из файла csv параллельно и выполнять запрос счета параллельно и распределить задания по кластеру?
3) Не могли бы вы показать мне, как оптимизировать приведенный выше код в pyspark для многопоточной обработки всех запросов счета одновременно?
Спасибо Анбу