Функция collect () для spark занимает слишком много времени для выполнения.Есть ли другая альтернатива для оптимизации кода? - PullRequest
0 голосов
/ 04 января 2019
sourceChannel = SourceChannel()
cur_df_rows = cur_df.collect()
cur_df_row = cur_df_rows[0]
agnt_chan = cur_df_row['agentChannel']
agnt_br_code = cur_df_row['agentBranchCode']
fo_br_code = cur_df_row['focusBranchCode']
#print("------------------------")
#print(agnt_chan)
#print(agnt_br_code)
#print("------------------------")
#sys.exit()
time_po = time.time()

so_chan = sourceChannel.get_policy_source_chan(
    agnt_chan, agnt_br_code, fo_br_code)

time_so_chan = time.time() - time_po
loggerObj.info(json.dumps(
    {"time taken to add policy chan": time_so_chan}))
with_so_chan = cur_df.withColumn('sourceChannel', lit(so_chan))\
                            .drop('focusBranchCode')
pRlnWithInsured = sprint_fields['ProposerRelationshipwithInsured'].lower()
if pRlnWithInsured == 'sds' or pRlnWithInsured == 'self':
    return with_so_chan.withColumn('selfInsured', lit('Y'))
else:
    return with_so_chan.withColumn('selfInsured', lit('N'))

cur_df_rows = cur_df.collect () сама строка занимает 2 минуты для выполнения время от времени. Мы используем кластер с одним узлом.

1 Ответ

0 голосов
/ 04 января 2019

заменить

cur_df_rows = cur_df.collect()

с

cur_df_rows = cur_df.first().collect()

вы соберете только первый ряд вместо целого df

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...