Повышение производительности объединения и нечеткого анализа в PySpark или Delta Table с использованием блоков данных - PullRequest
0 голосов
/ 13 июня 2019

Я работаю с 7 различными фреймами данных, где 3 взяты из CSV-файлов, хранящихся в хранилищах контейнеров и больших двоичных объектов, а остальные 4 - из запросов в sqldw.Эти кадры данных поступают из разных источников, но имеют одинаковую структуру

enter image description here

Я выполняю объединение после объединения и удаляю нулевые значения, добавляя столбец, чтобы зарегистрировать, к какому источнику он пришелот.Это все довольно просто, но для моего последнего я выполняю соединение от государства, а затем нечеткое совпадение с именем.

enter image description here

В моем фрейме данных содержится около 500 тыс. Строк, но после этого объединения оно достигает 30 миллионов, а для выполнения объединения и нечетких совпадений требуется более 30 минут.

Я читал о таблицах Delta и о том, как он действительно может обрабатывать петабайты.быстро.Я действительно не понял, как это работает.Я пытался создать дельта-файлы и дельта-таблицу из учебников, но когда я запустил код, потребовалось более часа, чтобы создать дельта-таблицу.

Может кто-нибудь помочь мне оптимизировать это?Либо используя паркет или дельта-стол, не уверен, что лучше.

Мои подключения к ноутбуку настроены следующим образом:

### STORAGE ACCOUNT ###
storage_account_name = "[storageAccount]"
container_name = "[containerName]"
storage_account_access_key = "[key1]"
temp_blob_name = "[blobPath]"

storage_account_account_key = "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name)
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(container_name, storage_account_name)

spark.conf.set(
    storage_account_account_key,
    storage_account_access_key)

spark._jsc.hadoopConfiguration().set(
    storage_account_account_key,
    storage_account_account_key)

### DW DATABASE ###
dwDatabase = "[database]"
dwServer = "[server]"
dwJdbcPort = "[port]"

dwUser = "[user]"
dwPass = "[password]"

dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

spark.conf.set(
    "spark.sql.parquet.writeLegacyFormat",
    "true")

И для чтения моих DataFrames:

### CSVs ###
df_csv = spark.read \
    .format('csv') \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .option("header", "true") \
    .load(inputSource + "/sftp_files/{}/{}".format(dateFolder, fileName))

### SQL ###
df_sql = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", sqlDwUrlSmall) \
    .option("tempDir", inputSource + temp_blob_name) \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("query", queryText) \
    .load()

После нечеткого матча у меня это:

df = df\
    .orderBy(['Name','State','bestScore'], ascending=[True,True,False], na_position='last')\
    .drop_duplicates(subset=['ID'])
threshold = 85
df = df.filter('bestScore >= {}'.format(threshold))
...