низкая производительность функции pyspark, преобразованной из кода plsql в pySpark, поскольку цикл снижает производительность - PullRequest
0 голосов
/ 21 июня 2019

Я конвертирую функцию PLSQL в код pyspark для проекта миграции. Существующий сценарий: Запрос Informatica SQ имеет sql, который вызывает функцию PLSQL. Например, выберите empid, job, AssingmentStatus (empid, joiningDate) из d_emp. Чтобы преобразовать приведенный выше сценарий в код pyspark, я взял указанный выше запрос в качестве информационного кадра и, используя df.transform, пытаюсь вызвать функцию «AssingmentStatus». Функция «AssingmentStatus» я уже переписал в коде pyspark. Ниже приведен пример того, как я это сделал. у d_emp ~ 7 миллионов записей

 # ~ 7  million records insq_df
SQ_df = spark.sql("select empid, job,joiningDate from d_emp")

# Created empty DF first.
tmp_empty_df = spark.sql("select empid, joiningdate from d_emp limit 0 ")
tmp_empty_df  = tmp_empty_df .withColumn('Assignstatus', lit(0))

for row in sq_df.collect():
   i_empid = row['empid']
   i_joiningDate = row['joiningDate]

 # createed DF for one row
   row_df = spark.sql("select {0} as empid,{1} as 
            joiningDate".format(i_empid ,i_joiningDate ))
  #use transform to call function AssingmentStatus
   return_df = (row_df.transform(lambda row_df: AssingmentStatus(empid, 
        joiningDate)))
  # I will get 0 or 1 as return status from function
    t_assignedstatus = return_df .head()[0]
 #create a df for each row
    assignmentdf = test_df.withColumn("t_assignedstatus", 
      lit(t_assignedstatus))
   # append each df coming for each row to empty df
     tmp_empty_df  = tmp_empty_df.union(assignmentdf)
   #After for loop ends get final DF.
      tmp_emp_activity_fn_status.show()

Функция AssingmentStatus имеет большие логические и рекурсивные вызовы в зависимости от условия, я изменил курсор plsql на dataframe и использовал синтаксис pyspark для перекодирования функции plsql (это большой код, поэтому здесь не вставлять) Но Return будет 0 или 1 при конец.

Мой код работает хорошо для нескольких записей, например, 100, но если я хочу запустить его для всех записей ~ 7 миллионов, он будет работать вечно.

Могу ли я улучшить производительность? или любой другой способ я могу написать эту функциональность? Я новичок, чтобы зажечь, поэтому любая помощь очень ценится.

...