Можно ли использовать целевую строку для инициализации UDAF Spark? - PullRequest
0 голосов
/ 10 октября 2018

У меня есть проблема, которую я пытаюсь решить в Spark, определяя свой собственный UDAF, пытаясь подражать рекомендациям, данным здесь и здесь .Моя конечная цель состоит в том, чтобы применить ряд сложных битовых сдвиговых и побитовых логических манипуляций к последовательности целых чисел в данном окне.

У меня возникли проблемы, поскольку мой сценарий использования находится в довольно большом наборе данных(~ 100 миллионов строк, для которых мне нужно выполнить 6 таких побитовых манипуляций с группами длиной от 2-7 элементов), и поэтому я пытаюсь реализовать это в scala.Проблема в том, что я новичок в scala (мой основной язык - python), и хотя сама scala не кажется такой уж сложной, сочетание нового языка плюс специфика самого класса UDAF применительно к window sоставляет меня немного озадаченным.

Объяснение логики на примере в python / pandas

Чтобы сделать вопрос более конкретным, рассмотрим pandas DataFrame:

keep = list(range(30))
for num in (3, 5, 11, 16, 22, 24):
    keep.pop(num)
np.random.seed(100)
df = pd.DataFrame({
    'id': 'A',
    'date': pd.date_range('2018-06-01', '2018-06-30')[keep],
    'num': np.random.randint(low=1, high=100, size=30)[keep]
})

Который производит:

   id       date  num
0   A 2018-06-01    9
1   A 2018-06-02   25
2   A 2018-06-03   68
3   A 2018-06-05   80
4   A 2018-06-06   49
5   A 2018-06-08   95
6   A 2018-06-09   53
7   A 2018-06-10   99
8   A 2018-06-11   54
9   A 2018-06-12   67
10  A 2018-06-13   99
11  A 2018-06-15   35
12  A 2018-06-16   25
13  A 2018-06-17   16
14  A 2018-06-18   61
15  A 2018-06-19   59
16  A 2018-06-21   10
17  A 2018-06-22   94
18  A 2018-06-23   87
19  A 2018-06-24    3
20  A 2018-06-25   28
21  A 2018-06-26    5
22  A 2018-06-28    2
23  A 2018-06-29   14

Я хотел бы иметь возможность относительно текущей строки найти количество дней,затем выполните некоторые побитовые манипуляции на основе этого значения.Чтобы продемонстрировать, оставаясь в пандах (мне нужно будет выполнить полное внешнее объединение, затем отфильтровать, чтобы продемонстрировать):

exp_df = df[['id', 'date']].merge(df, on='id') \ # full outer join on 'id'
                           .assign(days_diff = lambda df: (df['date_y'] - df['date_x']).dt.days) \ # number of days since my date of interest
                           .mask(lambda df: (df['days_diff'] > 3) | (df['days_diff'] < 0)) \ # nulls rows where days_diff isn't between 0 and 3
                           .dropna() \ # then filters the rows
                           .drop('date_y', axis='columns') \
                           .rename({'date_x': 'date', 'num': 'nums'}, axis='columns') \
                           .reset_index(drop=True)
exp_df[['nums', 'days_diff']] = exp_df[['nums', 'days_diff']].astype('int')

Теперь я выполняю свое побитовое смещение и другую логику:

# Extra values to add after bit-wise shifting (1 for shift of 1, 3 for shift of 2 ...)
additions = {val: sum(2**power for power in range(val)) for val in exp_df['days_diff'].unique()}
exp_df['shifted'] = np.left_shift(exp_df['nums'].values, exp_df['days_diff'].values) \
                    + exp_df['days_diff'].apply(lambda val: additions[val])

После всего этого exp_df выглядит следующим образом (первые 10 строк):

  id       date  nums  days_diff  shifted
0  A 2018-06-01     9          0        9
1  A 2018-06-01    25          1       51
2  A 2018-06-01    68          2      275
3  A 2018-06-02    25          0       25
4  A 2018-06-02    68          1      137
5  A 2018-06-02    80          3      647
6  A 2018-06-03    68          0       68
7  A 2018-06-03    80          2      323
8  A 2018-06-03    49          3      399
9  A 2018-06-05    80          0       80

Теперь я могу агрегировать:

exp_df.groupby('date')['shifted'].agg(lambda group_vals: np.bitwise_and.reduce(group_vals.values)

И конечный результат выглядит следующим образом(если я вернусь к исходному DataFrame:

   id       date  num  shifted
0   A 2018-06-01    9        1
1   A 2018-06-02   25        1
2   A 2018-06-03   68        0
3   A 2018-06-05   80       64
4   A 2018-06-06   49       33
5   A 2018-06-08   95        3
6   A 2018-06-09   53        1
7   A 2018-06-10   99        1
8   A 2018-06-11   54        6
9   A 2018-06-12   67        3
10  A 2018-06-13   99        3
11  A 2018-06-15   35        3
12  A 2018-06-16   25        1
13  A 2018-06-17   16        0
14  A 2018-06-18   61       21
15  A 2018-06-19   59       35
16  A 2018-06-21   10        8
17  A 2018-06-22   94        6
18  A 2018-06-23   87        3
19  A 2018-06-24    3        1
20  A 2018-06-25   28        0
21  A 2018-06-26    5        1
22  A 2018-06-28    2        0
23  A 2018-06-29   14       14

Назад к вопросу

Хорошо, теперь, когда я продемонстрировал свою логику, я понимаю, что по сути могуТо же самое в Spark - выполнение полного внешнего объединения DataFrame с самим собой, затем фильтрация и агрегирование.

Что я хочу знать, так это то, могу ли я избегать выполнения полного объединения, и вместо этогосоздать свой собственный UDAF для выполнения этого агрегирования по оконной функции, используя целевую строку в качестве входных данных. По сути, мне нужно создать эквивалент столбца "days_diff", чтобы выполнить требуемую логику, что означает сравнение целевой даты скаждый из тдругие даты в моем указанном окне.Возможно ли это вообще?

Кроме того, могу ли я вообще беспокоиться об использовании самостоятельного соединения?Я знаю, что искра выполняет всю свою обработку лениво, поэтому очень возможно, что мне не нужно было бы беспокоиться.Должен ли я ожидать, что производительность будет схожей, если я буду делать все это с использованием самопостроения по сравнению с моим воображаемым UDAF, примененным к окну?Логика более последовательная и ее легче отслеживать с помощью метода join-filter-aggregate, что является явным преимуществом.

Следует знать, что я буду выполнять эту логику для нескольких окон.В принципе, я мог бы cache самая большая версия отфильтрованного фрейма данных после объединения, а затем использовать ее для последующих вычислений.

...