Я работаю с 7 различными фреймами данных, где 3 взяты из CSV-файлов, хранящихся в хранилищах контейнеров и больших двоичных объектов, а остальные 4 - из запросов в sqldw.Эти кадры данных поступают из разных источников, но имеют одинаковую структуру
Я выполняю объединение после объединения и удаляю нулевые значения, добавляя столбец, чтобы зарегистрировать, к какому источнику он пришелот.Это все довольно просто, но для моего последнего я выполняю соединение от государства, а затем нечеткое совпадение с именем.
В моем фрейме данных содержится около 500 тыс. Строк, но после этого объединения оно достигает 30 миллионов, а для выполнения объединения и нечетких совпадений требуется более 30 минут.
Я читал о таблицах Delta и о том, как он действительно может обрабатывать петабайты.быстро.Я действительно не понял, как это работает.Я пытался создать дельта-файлы и дельта-таблицу из учебников, но когда я запустил код, потребовалось более часа, чтобы создать дельта-таблицу.
Может кто-нибудь помочь мне оптимизировать это?Либо используя паркет или дельта-стол, не уверен, что лучше.
Мои подключения к ноутбуку настроены следующим образом:
### STORAGE ACCOUNT ###
storage_account_name = "[storageAccount]"
container_name = "[containerName]"
storage_account_access_key = "[key1]"
temp_blob_name = "[blobPath]"
storage_account_account_key = "fs.azure.account.key.{}.blob.core.windows.net".format(storage_account_name)
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(container_name, storage_account_name)
spark.conf.set(
storage_account_account_key,
storage_account_access_key)
spark._jsc.hadoopConfiguration().set(
storage_account_account_key,
storage_account_account_key)
### DW DATABASE ###
dwDatabase = "[database]"
dwServer = "[server]"
dwJdbcPort = "[port]"
dwUser = "[user]"
dwPass = "[password]"
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
spark.conf.set(
"spark.sql.parquet.writeLegacyFormat",
"true")
И для чтения моих DataFrames:
### CSVs ###
df_csv = spark.read \
.format('csv') \
.option("inferSchema", "true") \
.option("delimiter", ";") \
.option("header", "true") \
.load(inputSource + "/sftp_files/{}/{}".format(dateFolder, fileName))
### SQL ###
df_sql = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", sqlDwUrlSmall) \
.option("tempDir", inputSource + temp_blob_name) \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", queryText) \
.load()
После нечеткого матча у меня это:
df = df\
.orderBy(['Name','State','bestScore'], ascending=[True,True,False], na_position='last')\
.drop_duplicates(subset=['ID'])
threshold = 85
df = df.filter('bestScore >= {}'.format(threshold))