Поток данных: использование beam.combiners по результатам предыдущего beam.combiners - PullRequest
0 голосов
/ 18 марта 2020

Я использую конвейер Beam для подсчета частот телефонных номеров для потоковой передачи данных. Я использую скольжение windows, которое повторяется каждые 5 минут с общим периодом 15 минут, так что, как и ожидалось, для некоторых входов я получаю несколько выходов, когда вход падает в несколько windows.

После расчета количества вхождений я хотел бы найти среднее значение для функции ввода. Входные данные являются кортежами типа:

('phone_number', '123')
('phone_number', '456')
('phone_number', '456')
('phone_number', '456')

Первая часть конвейера предназначена для подсчета частоты каждого числа:

| 'window' >> beam.WindowInto(window.SlidingWindows(900, 300))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'count_occurences' >> beam.combiners.Count.PerKey()

Это правильно, мой расчет работает правильно, и я могу рассчитать частоту каждого номера, получая 3 результата, поскольку в каждом периоде есть 3 скользящих windows (в нашем случае 2 из 456 вызовов были в одном и том же окне, а третий - в другом):

(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)

Теперь я хотел бы найти среднее значение для каждого номера телефона по всем вычисленным значениям окна, а именно:

(('phone_number', '123'), 1.0)
(('phone_number', '456'), 1.5)

Следующим шагом в моем конвейере будет

| 'Find Means' >> beam.combiners.Mean.PerKey()

Но это просто дает мне:

(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)

Есть ли какой-нибудь способ сделать еще один beam.combiners расчет по результатам предыдущего?

1 Ответ

0 голосов
/ 18 марта 2020

Причина, по которой beam.combiners.Mean.PerKey () выдает неправильный вывод, состоит в том, что объединитель выдает единственное значение, рассчитанное для каждой клавиши + окна.

Однако здесь есть еще кое-что. Причиной создания окон при потоковой обработке является обеспечение того, что входные данные ограничены до получения результата. То есть, как правило, входные данные для потоковых конвейеров не ограничены, то есть они никогда не прекращают прием данных, если конвейер не завершен. Поэтому невозможно вычислить значение по всем windows, так как вам нужно будет ждать вечно.

Мне кажется, вы пытаетесь вычислить "Среднее число вхождений телефонного номера". в 15-минутном окне, при сравнении всех возможных скольжений 15 минут windows, при скольжении каждые 5 минут ". Если это не так, пожалуйста, уточните, чтобы помочь мне понять

Так как нам нужно каким-то образом ограничить вычисление, может быть возможно периодически выводить результат, то есть для каждого окна, и продолжать выводить новый результат, обновляя его, пока не закончится конвейер. Это должно быть возможно с StatefulDoFn.

. Для этого я рекомендую:

  • Вывести значения из вашего скользящего windows в Global Windows
  • Сохранение суммы и количества для вычисления среднего значения в StatefulDoFn
  • Вывод вычисленного среднего значения периодически или для каждого элемента и обновление результата в нисходящем направлении (т.е. перезапись той же строки в BigQuery или удаление лишних строк при проверке таблицы BigQuery с помощью SQL)

Примерно так:

class ComputeMeanStatefulDoFn(DoFn):
  TOTAL_STATE = CombiningStateSpec('total', sum)
  COUNT_STATE = CombiningStateSpec('count', sum)

  def process(self, element,
      total=DoFn.StateParam(TOTAL_STATE),
      count=DoFn.StateParam(COUNT_STATE)):
    key_phone_number, value_window_count = element
    current_count = count.read() + 1
    current_total = total.read() + value_window_count
    mean = current_total / current_count
    # You can emit every N results to reduce the volume
    # but please make sure to at least emit the first M << N results
    yield (key_phone_number, mean)
    total.add(value_window_count)
    count.add(1)

| 'window' >> beam.WindowInto(window.SlidingWindows(900, 300))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'count_occurences' >> beam.combiners.Count.PerKey()
| 'window_globally' >> beam.WindowInto(window.GlobalWindows)
| 'compute_mean_across_windows' >> beam.ParDo(ComputeMeanStatefulDoFn)

По сути, здесь происходит то, что сумма и число сохраняются to persistance / disk, и мы пересчитываем новое среднее значение каждый раз, когда в глобальное окно поступает новый элемент.

Примечание. Вам придется иметь дело с выдачей обновленного среднего значения для одного и того же ключа несколько раз. Т.е. вы можете sh перезаписать строку в таблице BigQuery, содержащую ваши результаты.

Примечание: В зависимости от семантики, которую вы также пытаетесь вычислить, вы можете захотеть выдать пустую windows из функции Sliding Windows, чтобы они были включены в вычисление среднего значения на выходе.

Примечание: Вы не можете использовать Combine.globally здесь, поскольку это никогда не закончится, из-за характера неограниченного ввода в потоковом конвейере. Я считаю, что это может привести к ошибке, если вы попытаетесь запустить такой конвейер.

...