Создайте таблицу SCD2 из исходного файла, которая содержит несколько обновлений для одного идентификатора, используя Databricks / Spark - PullRequest
0 голосов
/ 25 мая 2020

Я хочу сделать медленно меняющееся измерение в блоках данных. Мой исходный фрейм данных содержит следующую информацию.

+-------------------+-------------------------+----------+-----------+-------------+
| actionimmediately |          date           | deviceid | patchguid |   status    |
+-------------------+-------------------------+----------+-----------+-------------+
| False             | 2018-08-15 04:01:00.000 |      123 | 00-001    | Install     |
| True              | 2018-08-16 00:00:00.000 |      123 | 00-001    | Install     |
| False             | 2018-08-10 01:00:00.000 |      123 | 00-001    | Not Approved|
| False             | 2020-01-01 00:00:00.000 |      333 | 11-111    | Declined    |
+-------------------+-------------------------+----------+-----------+-------------+

Фрейм данных, который я хочу в качестве вывода, выглядит так:

+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| mergekey  | deviceid | patchguid |    status    | actionimmediately |        starttime        |         endtime         | current |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| 12300-001 |      123 | 00-001    | Not Approved | False             | 2018-08-10 01:00:00.000 | 2018-08-15 04:01:00.000 | False   |
| 12300-001 |      123 | 00-001    | Install      | False             | 2018-08-15 04:01:00.000 | 2018-08-16 00:00:00.000 | False   |
| 12300-001 |      123 | 00-001    | Install      | True              | 2018-08-16 00:00:00.000 | null                    | True    |
| 33311-111 |      333 | 11-111    | Declined     | False             | 2020-01-01 00:00:00.000 | null                    | True    |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+

На самом деле исходный файл содержит 275475 строк

Я пробовал Уже 2 решения, но оба работают очень медленно. Как + -10h.

Решение 1. Использование слияния Delta Lake

Сначала я создаю seqId, который использую позже для итерации. Это связано с тем, что слияние не может обновлять одну и ту же строку несколько раз. Я создаю seqId с помощью окна.

source_df = source_df.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))
w1 = Window.partitionBy('mergekey').orderBy('date')
source_df = source_df.withColumn('seqid', row_number().over(w1))

Затем я создаю for l oop, который проходит по каждому seqId и объединяет строки. На самом деле max_seq_id равен 1900

def createTable (df, SeqId):
  df\
  .withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
  .select(\
          'mergekey',\
          'deviceid',\
          'patchguid',\
          'status',\
          'actionimmediately',\
          col('date').alias('starttime'))\
  .where(col('seqid') == SeqId)\
  .withColumn('endtime',lit(None).cast('timestamp'))\
  .withColumn('current',lit(True))\
  .write.format('delta')\
  .partitionBy("current")\
  .options(header='true',path='/mnt/destinationncentral/patch_approval')\
  .saveAsTable('patch_approval')

def MergePatchApproval (df,deltatable,seqNummer):
  dataframe = df.where(col('seqid') == seqNummer)
  newToInsert = dataframe.alias('updates')\
  .join(deltatable.toDF().alias('table'),['deviceid','patchguid'])\
  .select(\
          'updates.actionimmediately',\
          'updates.date',\
          'updates.deviceid',\
          'updates.patchguid',\
          'updates.status',\
          'updates.seqid')\
  .where('table.current = true and \
  (table.status <> updates.status or table.actionimmediately <> updates.actionimmediately)')

  stagedUpdates = (newToInsert.selectExpr('NULL as mergekey','*')\
                   .union(dataframe\
                          .withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
                          .select(\
                                  'mergekey',\
                                  'actionimmediately',\
                                  'date',\
                                  'deviceid',\
                                  'patchguid',\
                                  'status',\
                                  'seqid')))

  deltatable.alias('t')\
  .merge(stagedUpdates.alias('s'),'t.current = true and t.mergekey = s.mergekey')\
  .whenMatchedUpdate(condition = 't.current = true and \
  (t.status <> s.status or t.actionimmediately <> s.actionimmediately)', \
  set = {
    'endtime':'s.date',
    'current':'false'
  }).whenNotMatchedInsert(values = {
    'mergekey':'s.mergekey',
    'deviceid':'s.deviceid',
    'patchguid':'s.patchguid',
    'status':'s.status',
    'actionimmediately':'s.actionimmediately',
    'starttime':'s.date',
    'endtime':'NULL',
    'current':'true'
  }).execute()

for i in range(max_seq_id):
  i = i + 1
  print(i)
  df = source_df.where(col('seqid') == i)
  if(i == 1):
    tablecount = spark.sql("show tables like 'patch_approval'").count()
    if(tablecount == 0):
      createTable(df,i)
      approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
    else:
      approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
      MergePatchApproval(df,approval_table,i)
  else:
    MergePatchApproval(df,approval_table,i) 

Проблема, с которой я столкнулся с этим решением, заключается в том, что время для записи данных в озеро данных azure занимает некоторое время, что нормально, я думаю, но также время выполнения на каждую итерацию увеличивается.

Решение 2. Обновите фреймы данных и напишите один раз в конце

В этом решении я также использую для l oop и seqId, но вместо того, чтобы писать каждый l oop to azure data lake Я делаю это только в конце. Это решение решает проблему задержки записи, но время для каждого l oop до конца все еще увеличивается.

def createDestDF(sourceDF):
  dest_df = sourceDF\
    .select(\
            'mergekey',\
            'deviceid',\
            'patchguid',\
            'status',\
            'actionimmediately',\
            col('date').alias('starttime'))\
    .withColumn('endtime',lit(None).cast('timestamp'))\
    .withColumn('current',lit(True))
  return dest_df

def getChangedRecords(sourceDF,destDF):
  changedRecords = sourceDF.alias('u')\
  .join(destDF.alias('t'),['deviceid','patchguid'])\
  .select(\
         'u.actionimmediately',\
         'u.date',\
         'u.deviceid',\
         'u.patchguid',\
         'u.status',\
         'u.seqid',\
         'u.mergekey')\
  .where('t.current = true and \
  (t.status <> u.status or t.actionimmediately <> u.actionimmediately)')

  return changedRecords

def getNewRecords(sourceDF,destDF):
  newRecords = sourceDF.alias('n')\
  .join(destDF.alias('t'),['deviceid','patchguid'],'left')\
  .select(\
          't.mergekey',\
          'n.actionimmediately',\
          'n.date',\
          'deviceid',\
          'patchguid',\
          'n.status',\
          'n.seqid')\
  .where('t.current is null')
  return newRecords

def upsertChangedRecords(sourceDF,destDF):
  endTimeColumn = expr("""IF(endtimeOld IS NULL, date, endtimeOld)""")
  currentColumn = expr("""IF(date IS NULL, currentOld, False)""")

  updateDF = sourceDF.alias('s').join(destDF.alias('t'),'mergekey','right').select(\
                                                                                'mergekey',\
                                                                                't.deviceid',\
                                                                                't.patchguid',\
                                                                                't.status',\
                                                                                't.actionimmediately',\
                                                                                't.starttime',\
                                                                                's.date',\
                                                                                col('t.current').alias('currentOld'),\
                                                                                col('t.endTime').alias('endtimeOld'))\
  .withColumn('endtime',endTimeColumn)\
  .withColumn('current',currentColumn)\
  .drop('currentOld','date','endTimeOld')

  updateInsertDF = sourceDF\
  .select(\
          'mergekey',\
          'deviceid',\
          'patchguid',\
          'status',\
          'actionimmediately',\
          col('date').alias('starttime'))\
  .withColumn('endtime',lit(None).cast('timestamp'))\
  .withColumn('current',lit(True))

  resultDF = updateDF.union(updateInsertDF)
  return resultDF

def insertNewRecords(sourceDF, destDF):
  insertDF = sourceDF\
  .select(\
          'mergekey',\
          'deviceid',\
          'patchguid',\
          'status',\
          'actionimmediately',\
          col('date').alias('starttime'))\
  .withColumn('endtime',lit(None).cast('timestamp'))\
  .withColumn('current',lit(True))

  resultDF = destDF.union(insertDF)

  return resultDF

for i in range(max_seq_id):
  i = i + 1
  print(i)
  seq_df = source_df.where(col('seqid') == i)
  if i == 1:
    tablecount = spark.sql("show tables like 'patch_approval'").count()
    if(tablecount == 0):
      dest_df = createDestDF(seq_df)
    else:
      changed_df = getChangedRecords(seq_df,dest_df)
      new_df = getNewRecords(seq_df,dest_df)
      dest_df = upsertChangedRecords(changed_df,dest_df)
      dest_df = insertNewRecords(new_df,dest_df)
  else:
    changed_df = getChangedRecords(seq_df,dest_df)
    new_df = getNewRecords(seq_df,dest_df)
    dest_df = upsertChangedRecords(changed_df,dest_df)
    dest_df = insertNewRecords(new_df,dest_df)

dest_df\
.write\
.format('delta')\
.partitionBy('current')\
.mode('overwrite')\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')

Любая идея, как я могу решить проблему увеличения времени выполнения в for l oop?

С уважением,

1 Ответ

0 голосов
/ 26 мая 2020
• 1000 1003 * Он должен быть очень быстрым и приводить к желаемому результату. Я проверил это на ваших данных образца, и после этого df_scd показывает:
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
| mergekey|deviceid|patchguid|      status|actionimmediately|           starttime|             endtime|current|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
|12300-001|     123|   00-001|Not Approved|            False|2018-08-10 01:00:...|2018-08-15 04:01:...|  false|
|12300-001|     123|   00-001|     Install|            False|2018-08-15 04:01:...|2018-08-16 00:00:...|  false|
|12300-001|     123|   00-001|     Install|             True|2018-08-16 00:00:...|                null|   true|
|33311-111|     333|   11-111|    Declined|            False|2020-01-01 00:00:...|                null|   true|
+---------+--------+---------+------------+-----------------+--------------------+--------------------+-------+
...