Я использую конвейер Beam для подсчета частот телефонных номеров для потоковой передачи данных. Я использую скольжение windows, которое повторяется каждые 5 минут с общим периодом 15 минут, так что, как и ожидалось, для некоторых входов я получаю несколько выходов, когда вход падает в несколько windows.
После расчета количества вхождений я хотел бы найти среднее значение для функции ввода. Входные данные являются кортежами типа:
('phone_number', '123')
('phone_number', '456')
('phone_number', '456')
('phone_number', '456')
Первая часть конвейера предназначена для подсчета частоты каждого числа:
| 'window' >> beam.WindowInto(window.SlidingWindows(900, 300))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'count_occurences' >> beam.combiners.Count.PerKey()
Это правильно, мой расчет работает правильно, и я могу рассчитать частоту каждого номера, получая 3 результата, поскольку в каждом периоде есть 3 скользящих windows (в нашем случае 2 из 456 вызовов были в одном и том же окне, а третий - в другом):
(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '123'), 1)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 2)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)
(('phone_number', '456'), 1)
Теперь я хотел бы найти среднее значение для каждого номера телефона по всем вычисленным значениям окна, а именно:
(('phone_number', '123'), 1.0)
(('phone_number', '456'), 1.5)
Следующим шагом в моем конвейере будет
| 'Find Means' >> beam.combiners.Mean.PerKey()
Но это просто дает мне:
(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '123'), 1.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 2.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)
(('phone_number', '456'), 1.0)
Есть ли какой-нибудь способ сделать еще один beam.combiners расчет по результатам предыдущего?