Я хочу сделать медленно меняющееся измерение в блоках данных. Мой исходный фрейм данных содержит следующую информацию.
+-------------------+-------------------------+----------+-----------+-------------+
| actionimmediately | date | deviceid | patchguid | status |
+-------------------+-------------------------+----------+-----------+-------------+
| False | 2018-08-15 04:01:00.000 | 123 | 00-001 | Install |
| True | 2018-08-16 00:00:00.000 | 123 | 00-001 | Install |
| False | 2018-08-10 01:00:00.000 | 123 | 00-001 | Not Approved|
| False | 2020-01-01 00:00:00.000 | 333 | 11-111 | Declined |
+-------------------+-------------------------+----------+-----------+-------------+
Фрейм данных, который я хочу в качестве вывода, выглядит так:
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| mergekey | deviceid | patchguid | status | actionimmediately | starttime | endtime | current |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| 12300-001 | 123 | 00-001 | Not Approved | False | 2018-08-10 01:00:00.000 | 2018-08-15 04:01:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | False | 2018-08-15 04:01:00.000 | 2018-08-16 00:00:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | True | 2018-08-16 00:00:00.000 | null | True |
| 33311-111 | 333 | 11-111 | Declined | False | 2020-01-01 00:00:00.000 | null | True |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
На самом деле исходный файл содержит 275475 строк
Я пробовал Уже 2 решения, но оба работают очень медленно. Как + -10h.
Решение 1. Использование слияния Delta Lake
Сначала я создаю seqId, который использую позже для итерации. Это связано с тем, что слияние не может обновлять одну и ту же строку несколько раз. Я создаю seqId с помощью окна.
source_df = source_df.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))
w1 = Window.partitionBy('mergekey').orderBy('date')
source_df = source_df.withColumn('seqid', row_number().over(w1))
Затем я создаю for l oop, который проходит по каждому seqId и объединяет строки. На самом деле max_seq_id равен 1900
def createTable (df, SeqId):
df\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.where(col('seqid') == SeqId)\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))\
.write.format('delta')\
.partitionBy("current")\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
def MergePatchApproval (df,deltatable,seqNummer):
dataframe = df.where(col('seqid') == seqNummer)
newToInsert = dataframe.alias('updates')\
.join(deltatable.toDF().alias('table'),['deviceid','patchguid'])\
.select(\
'updates.actionimmediately',\
'updates.date',\
'updates.deviceid',\
'updates.patchguid',\
'updates.status',\
'updates.seqid')\
.where('table.current = true and \
(table.status <> updates.status or table.actionimmediately <> updates.actionimmediately)')
stagedUpdates = (newToInsert.selectExpr('NULL as mergekey','*')\
.union(dataframe\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'actionimmediately',\
'date',\
'deviceid',\
'patchguid',\
'status',\
'seqid')))
deltatable.alias('t')\
.merge(stagedUpdates.alias('s'),'t.current = true and t.mergekey = s.mergekey')\
.whenMatchedUpdate(condition = 't.current = true and \
(t.status <> s.status or t.actionimmediately <> s.actionimmediately)', \
set = {
'endtime':'s.date',
'current':'false'
}).whenNotMatchedInsert(values = {
'mergekey':'s.mergekey',
'deviceid':'s.deviceid',
'patchguid':'s.patchguid',
'status':'s.status',
'actionimmediately':'s.actionimmediately',
'starttime':'s.date',
'endtime':'NULL',
'current':'true'
}).execute()
for i in range(max_seq_id):
i = i + 1
print(i)
df = source_df.where(col('seqid') == i)
if(i == 1):
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
createTable(df,i)
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
else:
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
MergePatchApproval(df,approval_table,i)
else:
MergePatchApproval(df,approval_table,i)
Проблема, с которой я столкнулся с этим решением, заключается в том, что время для записи данных в озеро данных azure занимает некоторое время, что нормально, я думаю, но также время выполнения на каждую итерацию увеличивается.
Решение 2. Обновите фреймы данных и напишите один раз в конце
В этом решении я также использую для l oop и seqId, но вместо того, чтобы писать каждый l oop to azure data lake Я делаю это только в конце. Это решение решает проблему задержки записи, но время для каждого l oop до конца все еще увеличивается.
def createDestDF(sourceDF):
dest_df = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
return dest_df
def getChangedRecords(sourceDF,destDF):
changedRecords = sourceDF.alias('u')\
.join(destDF.alias('t'),['deviceid','patchguid'])\
.select(\
'u.actionimmediately',\
'u.date',\
'u.deviceid',\
'u.patchguid',\
'u.status',\
'u.seqid',\
'u.mergekey')\
.where('t.current = true and \
(t.status <> u.status or t.actionimmediately <> u.actionimmediately)')
return changedRecords
def getNewRecords(sourceDF,destDF):
newRecords = sourceDF.alias('n')\
.join(destDF.alias('t'),['deviceid','patchguid'],'left')\
.select(\
't.mergekey',\
'n.actionimmediately',\
'n.date',\
'deviceid',\
'patchguid',\
'n.status',\
'n.seqid')\
.where('t.current is null')
return newRecords
def upsertChangedRecords(sourceDF,destDF):
endTimeColumn = expr("""IF(endtimeOld IS NULL, date, endtimeOld)""")
currentColumn = expr("""IF(date IS NULL, currentOld, False)""")
updateDF = sourceDF.alias('s').join(destDF.alias('t'),'mergekey','right').select(\
'mergekey',\
't.deviceid',\
't.patchguid',\
't.status',\
't.actionimmediately',\
't.starttime',\
's.date',\
col('t.current').alias('currentOld'),\
col('t.endTime').alias('endtimeOld'))\
.withColumn('endtime',endTimeColumn)\
.withColumn('current',currentColumn)\
.drop('currentOld','date','endTimeOld')
updateInsertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = updateDF.union(updateInsertDF)
return resultDF
def insertNewRecords(sourceDF, destDF):
insertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = destDF.union(insertDF)
return resultDF
for i in range(max_seq_id):
i = i + 1
print(i)
seq_df = source_df.where(col('seqid') == i)
if i == 1:
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
dest_df = createDestDF(seq_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
dest_df\
.write\
.format('delta')\
.partitionBy('current')\
.mode('overwrite')\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
Любая идея, как я могу решить проблему увеличения времени выполнения в for l oop?
С уважением,