У меня есть проблема, которую я пытаюсь решить в Spark, определяя свой собственный UDAF, пытаясь подражать рекомендациям, данным здесь и здесь .Моя конечная цель состоит в том, чтобы применить ряд сложных битовых сдвиговых и побитовых логических манипуляций к последовательности целых чисел в данном окне.
У меня возникли проблемы, поскольку мой сценарий использования находится в довольно большом наборе данных(~ 100 миллионов строк, для которых мне нужно выполнить 6 таких побитовых манипуляций с группами длиной от 2-7 элементов), и поэтому я пытаюсь реализовать это в scala.Проблема в том, что я новичок в scala (мой основной язык - python), и хотя сама scala не кажется такой уж сложной, сочетание нового языка плюс специфика самого класса UDAF применительно к window
sоставляет меня немного озадаченным.
Объяснение логики на примере в python / pandas
Чтобы сделать вопрос более конкретным, рассмотрим pandas
DataFrame
:
keep = list(range(30))
for num in (3, 5, 11, 16, 22, 24):
keep.pop(num)
np.random.seed(100)
df = pd.DataFrame({
'id': 'A',
'date': pd.date_range('2018-06-01', '2018-06-30')[keep],
'num': np.random.randint(low=1, high=100, size=30)[keep]
})
Который производит:
id date num
0 A 2018-06-01 9
1 A 2018-06-02 25
2 A 2018-06-03 68
3 A 2018-06-05 80
4 A 2018-06-06 49
5 A 2018-06-08 95
6 A 2018-06-09 53
7 A 2018-06-10 99
8 A 2018-06-11 54
9 A 2018-06-12 67
10 A 2018-06-13 99
11 A 2018-06-15 35
12 A 2018-06-16 25
13 A 2018-06-17 16
14 A 2018-06-18 61
15 A 2018-06-19 59
16 A 2018-06-21 10
17 A 2018-06-22 94
18 A 2018-06-23 87
19 A 2018-06-24 3
20 A 2018-06-25 28
21 A 2018-06-26 5
22 A 2018-06-28 2
23 A 2018-06-29 14
Я хотел бы иметь возможность относительно текущей строки найти количество дней,затем выполните некоторые побитовые манипуляции на основе этого значения.Чтобы продемонстрировать, оставаясь в пандах (мне нужно будет выполнить полное внешнее объединение, затем отфильтровать, чтобы продемонстрировать):
exp_df = df[['id', 'date']].merge(df, on='id') \ # full outer join on 'id'
.assign(days_diff = lambda df: (df['date_y'] - df['date_x']).dt.days) \ # number of days since my date of interest
.mask(lambda df: (df['days_diff'] > 3) | (df['days_diff'] < 0)) \ # nulls rows where days_diff isn't between 0 and 3
.dropna() \ # then filters the rows
.drop('date_y', axis='columns') \
.rename({'date_x': 'date', 'num': 'nums'}, axis='columns') \
.reset_index(drop=True)
exp_df[['nums', 'days_diff']] = exp_df[['nums', 'days_diff']].astype('int')
Теперь я выполняю свое побитовое смещение и другую логику:
# Extra values to add after bit-wise shifting (1 for shift of 1, 3 for shift of 2 ...)
additions = {val: sum(2**power for power in range(val)) for val in exp_df['days_diff'].unique()}
exp_df['shifted'] = np.left_shift(exp_df['nums'].values, exp_df['days_diff'].values) \
+ exp_df['days_diff'].apply(lambda val: additions[val])
После всего этого exp_df
выглядит следующим образом (первые 10 строк):
id date nums days_diff shifted
0 A 2018-06-01 9 0 9
1 A 2018-06-01 25 1 51
2 A 2018-06-01 68 2 275
3 A 2018-06-02 25 0 25
4 A 2018-06-02 68 1 137
5 A 2018-06-02 80 3 647
6 A 2018-06-03 68 0 68
7 A 2018-06-03 80 2 323
8 A 2018-06-03 49 3 399
9 A 2018-06-05 80 0 80
Теперь я могу агрегировать:
exp_df.groupby('date')['shifted'].agg(lambda group_vals: np.bitwise_and.reduce(group_vals.values)
И конечный результат выглядит следующим образом(если я вернусь к исходному DataFrame
:
id date num shifted
0 A 2018-06-01 9 1
1 A 2018-06-02 25 1
2 A 2018-06-03 68 0
3 A 2018-06-05 80 64
4 A 2018-06-06 49 33
5 A 2018-06-08 95 3
6 A 2018-06-09 53 1
7 A 2018-06-10 99 1
8 A 2018-06-11 54 6
9 A 2018-06-12 67 3
10 A 2018-06-13 99 3
11 A 2018-06-15 35 3
12 A 2018-06-16 25 1
13 A 2018-06-17 16 0
14 A 2018-06-18 61 21
15 A 2018-06-19 59 35
16 A 2018-06-21 10 8
17 A 2018-06-22 94 6
18 A 2018-06-23 87 3
19 A 2018-06-24 3 1
20 A 2018-06-25 28 0
21 A 2018-06-26 5 1
22 A 2018-06-28 2 0
23 A 2018-06-29 14 14
Назад к вопросу
Хорошо, теперь, когда я продемонстрировал свою логику, я понимаю, что по сути могуТо же самое в Spark - выполнение полного внешнего объединения DataFrame с самим собой, затем фильтрация и агрегирование.
Что я хочу знать, так это то, могу ли я избегать выполнения полного объединения, и вместо этогосоздать свой собственный UDAF для выполнения этого агрегирования по оконной функции, используя целевую строку в качестве входных данных. По сути, мне нужно создать эквивалент столбца "days_diff"
, чтобы выполнить требуемую логику, что означает сравнение целевой даты скаждый из тдругие даты в моем указанном окне.Возможно ли это вообще?
Кроме того, могу ли я вообще беспокоиться об использовании самостоятельного соединения?Я знаю, что искра выполняет всю свою обработку лениво, поэтому очень возможно, что мне не нужно было бы беспокоиться.Должен ли я ожидать, что производительность будет схожей, если я буду делать все это с использованием самопостроения по сравнению с моим воображаемым UDAF, примененным к окну?Логика более последовательная и ее легче отслеживать с помощью метода join-filter-aggregate, что является явным преимуществом.
Следует знать, что я буду выполнять эту логику для нескольких окон.В принципе, я мог бы cache
самая большая версия отфильтрованного фрейма данных после объединения, а затем использовать ее для последующих вычислений.