Как обсуждалось в комментариях, одна из возможностей - перейти к Отбрасывание запущенных панелей , которые можно установить с помощью accumulation_mode=trigger.AccumulationMode.DISCARDING
.Если вы все еще хотите сохранить режим ACCUMULATING
, вы можете изменить TopCombineFn
, чтобы повторяющиеся панели одного и того же пользователя перезаписывали предыдущее значение и избегали дублирования клавиш.TopDistinctFn
примет за основу код здесь для Beam SDK 2.13.0.В методе add_input
мы сделаем предыдущую проверку следующим образом:
for current_top_element in enumerate(heap):
if element[0] == current_top_element[1].value[0]:
heap[current_top_element[0]] = heap[-1]
heap.pop()
heapq.heapify(heap)
По сути, мы сравним ключ для элемента, который мы оцениваем (element[0]
), с каждым элементом вкуча.Элементы кучи имеют тип ComparableValue
, поэтому мы можем использовать value
для возврата кортежа (и value[0]
для получения ключа).Если они совпадают, мы хотим вытащить их из кучи (по мере накопления сумма будет больше).Beam SDK использует библиотеку heapq
, поэтому я основал свой подход на этом ответе для удаления элемента i-th
(мы используем enumerate
для хранения информации индекса).
Я добавил некоторые записи в журнале, чтобы иметь возможность обнаруживать дубликаты:
logging.info("Duplicate: " + element[0] + "," + str(element[1]) + ' --- ' + current_top_element[1].value[0] + ',' + str(current_top_element[1].value[1]))
Код находится в файле top.py
внутри папки combiners
(с __init__.py
) иЯ импортирую его с помощью:
from combiners.top import TopDistinctFn
Затем я могу использовать TopDistinctFn
из конвейера следующим образом:
(inputs
| 'Add User as key' >> beam.Map(lambda x: (x, 1)) # ('key', 1)
| 'Apply Window of time' >> beam.WindowInto(
beam.window.FixedWindows(size=10*60),
trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(2)),
accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
| 'Sum Score' >> beam.CombinePerKey(sum)
| 'Top 10 scores' >> beam.CombineGlobally(
TopDistinctFn(n=10, compare=lambda a, b: a[1] < b[1])).without_defaults()
| 'Print results' >> beam.ParDo(PrintTop10Fn()))
Полный код можно найти здесь .generate_messages.py
- генератор сообщений Pub / Sub, top.py
содержит пользовательскую версию TopCombineFn
, переименованную в TopDistinctFn
(может показаться подавляющим, но я добавил только несколько строк кода, начиная со строки 425) и test_combine.py
основнойкод конвейера.Для этого вы можете поместить файлы в правильную папку, установить Beam SDK 2.13.0, если необходимо, изменить идентификатор проекта и тему Pub / Sub в generate_messages.py
и test_combine-py
.Затем начните публиковать сообщения с python generate_messages.py
и в другой оболочке запустите конвейер с DirectRunner
: python test_combine.py --streaming
.С DataflowRunner
вам, вероятно, потребуется добавить дополнительные файлы с файлом setup.py
.
Например, Bob
лидировал с 9 точками, а когда следующийприходит обновление, его оценка до 11 баллов.Он появится в следующем резюме с только обновленным счетом и без дубликатов (как обнаружено в нашей регистрации).Запись с 9 точками не будет отображаться, и все же топ будет иметь 10 пользователей по желанию.Аналогично для Marta
.Я заметил, что старые оценки по-прежнему отображаются в куче, даже если они не входят в топ-10, но я не уверен, как работает сборка мусора с heapq
.
INFO:root:>>> Current top 10: [('Bob', 9), ('Connor', 8), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Kevin', 6), ('Laura', 6), ('Marta', 6), ('Diane', 4), ('Bacon', 4)]
...
INFO:root:Duplicate: Marta,8 --- Marta,6
INFO:root:Duplicate: Bob,11 --- Bob,9
INFO:root:>>> Current top 10: [('Bob', 11), ('Connor', 8), ('Marta', 8), ('Bacon', 7), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Laura', 6), ('Diane', 6), ('Kevin', 6)]
Дайте мне знать, хорошо ли это работает для васдело тоже.