Многопроцессорный подход Pyspark или оптимизация кода - PullRequest
1 голос
/ 23 января 2020

Не могли бы вы помочь мне по приведенному ниже случаю в коде pyspark 2.4.3 блока данных, который обрабатывает терабайты данных json, считанных из корзины aws s3.

case 1:

В настоящее время я читаю несколько таблиц последовательно, чтобы получить количество дней для каждой таблицы.

Например, в table_list.csv есть один столбец с несколькими именами таблиц

year = 2019 month = 12

tablesDF = spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
tabList = tablesDF.toPandas().values.tolist()
for table in tabList:
    tab_name = table[0]

// Настройки снежинок и количество таблиц снежинок ()

sfOptions = dict(
  "sfURL" -> "",
  "sfAccount" -> "",
  "sfUser" -> "",
  "sfPassword" -> "",
  "sfDatabase" -> "",
  "sfSchema" -> "",
  "sfWarehouse" -> "",
)

// Read data as dataframe

sfxdf = spark.read
  .format("snowflake")
  .options(**sfOptions)
  .option("query", "select y as year,m as month,count(*) as sCount from
{} where y={} and m={} group by year,month").format(tab_name,year,month)
      .load()

// блоки данных delta lake

     dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from
db.{} where y={} and m={}" group by year,month).format(tab_name,year,month)

resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))


    finalDF = resultDF.withColumn("table_name",
lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")


    finalDF.coalesce(1).write.format('csv').option('header', 'true').save("s3a://outputs/reportcsv)

Вопросы:

1) В настоящее время я выполняю основанный на последовательности запуск с использованием запроса количества, принимая etables одним.

2) Как читать все таблицы из файла csv параллельно и выполнять запрос счета параллельно и распределить задания по кластеру?

3) Не могли бы вы показать мне, как оптимизировать приведенный выше код в pyspark для многопоточной обработки всех запросов счета одновременно?

Спасибо Анбу

...