Может быть, это какая-то магия, но решение следующее :.
Функция output_vwap должна иметь вывод в виде вывода:
def calculate_vwap(ric_id, interval_start, interval_finish, vwap_data, row_n):
some_tmp_vwap_interval_data = \
vwap_data.query(
'TKER == @ric_id and interval > @interval_start and interval < '
'@interval_finish '
)[['IVWP', 'INTV']]
if sum(some_tmp_vwap_interval_data['INTV']):
return \
sum(
vwap * volume for vwap, volume in
zip(some_tmp_vwap_interval_data['IVWP'],
some_tmp_vwap_interval_data['INTV'])
) \
/ sum(some_tmp_vwap_interval_data['INTV']), \
some_tmp_vwap_interval_data.IVWP.iloc[0], \
some_tmp_vwap_interval_data.IVWP.iloc[-1], \
some_tmp_vwap_interval_data.INTV.sum()
return None
после этого этот вывод должен быть преобразован в фрейм данных:
pd.DataFrame(
dd.from_pandas(crosses_data[[
'RIC', 'Interval_Start_Human',
'Interval_End_Human']],
npartitions=int(partitions_number)).
map_partitions(
lambda df: df.apply((
lambda row: calculate_vwap(row[0],
row[1],
row[2],
vwap_data,
row.name)),
axis=1)).
compute(scheduler=get).values.tolist())