Apache Beam - Python: как получить 10 лучших элементов PCollection с накоплением? - PullRequest
0 голосов
/ 16 июня 2019

Я хотел бы получить 10 лучших результатов, как это:

Paul - 38
Michel - 27
Hugo - 27
Bob - 24
Kevin - 19
...
(10 elements)

Я использую фиксированное окно и управляемый данными триггер, который выводит ранние результаты после того, как панель собрала X элементов. Кроме того, я использую комбинатор, чтобы получить 10 лучших результатов.

(inputs
         | 'Apply Window of time' >> beam.WindowInto(
                        beam.window.FixedWindows(size=5 * 60))
                        trigger=trigger.Repeatedly(trigger.AfterCount(2)),
                  accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
         | beam.ParDo(PairWithOne()) # ('key', 1)
         | beam.CombinePerKey(sum)
         | 'Top 10 scores' >> beam.CombineGlobally(
                        beam.combiners.TopCombineFn(n=10,
                                                    compare=lambda a, b: a[1] < b[
                                                        1])).without_defaults())

Проблема здесь в том, что первый вывод кажется правильным, но следующие выходы содержат дублированные ключи, подобные этому:

Paul - 38
Paul - 37
Michel - 27
Paul - 36
Michel - 26
Kevin - 20
...
(10 elements)

Как видите, я получаю не 10 различных пар K / V, а дублированные ключи.

Когда не используется стратегия триггера / накопления, это работает хорошо ... но если я хочу иметь окно на 2 часа, я бы хотел получать частые обновления ...

1 Ответ

1 голос
/ 16 июня 2019

Как обсуждалось в комментариях, одна из возможностей - перейти к Отбрасывание запущенных панелей , которые можно установить с помощью accumulation_mode=trigger.AccumulationMode.DISCARDING.Если вы все еще хотите сохранить режим ACCUMULATING, вы можете изменить TopCombineFn, чтобы повторяющиеся панели одного и того же пользователя перезаписывали предыдущее значение и избегали дублирования клавиш.TopDistinctFn примет за основу код здесь для Beam SDK 2.13.0.В методе add_input мы сделаем предыдущую проверку следующим образом:

for current_top_element in enumerate(heap):
  if element[0] == current_top_element[1].value[0]:
    heap[current_top_element[0]] = heap[-1]
    heap.pop()
    heapq.heapify(heap)

По сути, мы сравним ключ для элемента, который мы оцениваем (element[0]), с каждым элементом вкуча.Элементы кучи имеют тип ComparableValue, поэтому мы можем использовать value для возврата кортежа (и value[0] для получения ключа).Если они совпадают, мы хотим вытащить их из кучи (по мере накопления сумма будет больше).Beam SDK использует библиотеку heapq, поэтому я основал свой подход на этом ответе для удаления элемента i-th (мы используем enumerate для хранения информации индекса).

Я добавил некоторые записи в журнале, чтобы иметь возможность обнаруживать дубликаты:

logging.info("Duplicate: " + element[0] + "," + str(element[1]) + ' --- ' + current_top_element[1].value[0] + ',' + str(current_top_element[1].value[1]))

Код находится в файле top.py внутри папки combiners__init__.py) иЯ импортирую его с помощью:

from combiners.top import TopDistinctFn

Затем я могу использовать TopDistinctFn из конвейера следующим образом:

(inputs
     | 'Add User as key' >> beam.Map(lambda x: (x, 1)) # ('key', 1)
     | 'Apply Window of time' >> beam.WindowInto(
                    beam.window.FixedWindows(size=10*60),
                    trigger=beam.trigger.Repeatedly(beam.trigger.AfterCount(2)),
                    accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING)
     | 'Sum Score' >> beam.CombinePerKey(sum)   
     | 'Top 10 scores' >> beam.CombineGlobally(
                    TopDistinctFn(n=10, compare=lambda a, b: a[1] < b[1])).without_defaults()
     | 'Print results' >> beam.ParDo(PrintTop10Fn()))

Полный код можно найти здесь .generate_messages.py - генератор сообщений Pub / Sub, top.py содержит пользовательскую версию TopCombineFn, переименованную в TopDistinctFn (может показаться подавляющим, но я добавил только несколько строк кода, начиная со строки 425) и test_combine.py основнойкод конвейера.Для этого вы можете поместить файлы в правильную папку, установить Beam SDK 2.13.0, если необходимо, изменить идентификатор проекта и тему Pub / Sub в generate_messages.py и test_combine-py.Затем начните публиковать сообщения с python generate_messages.py и в другой оболочке запустите конвейер с DirectRunner: python test_combine.py --streamingDataflowRunner вам, вероятно, потребуется добавить дополнительные файлы с файлом setup.py.

Например, Bob лидировал с 9 точками, а когда следующийприходит обновление, его оценка до 11 баллов.Он появится в следующем резюме с только обновленным счетом и без дубликатов (как обнаружено в нашей регистрации).Запись с 9 точками не будет отображаться, и все же топ будет иметь 10 пользователей по желанию.Аналогично для Marta.Я заметил, что старые оценки по-прежнему отображаются в куче, даже если они не входят в топ-10, но я не уверен, как работает сборка мусора с heapq.

INFO:root:>>> Current top 10: [('Bob', 9), ('Connor', 8), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Kevin', 6), ('Laura', 6), ('Marta', 6), ('Diane', 4), ('Bacon', 4)]
...
INFO:root:Duplicate: Marta,8 --- Marta,6
INFO:root:Duplicate: Bob,11 --- Bob,9
INFO:root:>>> Current top 10: [('Bob', 11), ('Connor', 8), ('Marta', 8), ('Bacon', 7), ('Eva', 7), ('Hugo', 7), ('Paul', 6), ('Laura', 6), ('Diane', 6), ('Kevin', 6)]

Дайте мне знать, хорошо ли это работает для васдело тоже.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...