sourceChannel = SourceChannel()
cur_df_rows = cur_df.collect()
cur_df_row = cur_df_rows[0]
agnt_chan = cur_df_row['agentChannel']
agnt_br_code = cur_df_row['agentBranchCode']
fo_br_code = cur_df_row['focusBranchCode']
#print("------------------------")
#print(agnt_chan)
#print(agnt_br_code)
#print("------------------------")
#sys.exit()
time_po = time.time()
so_chan = sourceChannel.get_policy_source_chan(
agnt_chan, agnt_br_code, fo_br_code)
time_so_chan = time.time() - time_po
loggerObj.info(json.dumps(
{"time taken to add policy chan": time_so_chan}))
with_so_chan = cur_df.withColumn('sourceChannel', lit(so_chan))\
.drop('focusBranchCode')
pRlnWithInsured = sprint_fields['ProposerRelationshipwithInsured'].lower()
if pRlnWithInsured == 'sds' or pRlnWithInsured == 'self':
return with_so_chan.withColumn('selfInsured', lit('Y'))
else:
return with_so_chan.withColumn('selfInsured', lit('N'))
cur_df_rows = cur_df.collect () сама строка занимает 2 минуты для выполнения время от времени. Мы используем кластер с одним узлом.