Потоковая передача данных в режиме реального времени с использованием API Wikipedia RecentChanges - PullRequest
1 голос
/ 23 октября 2019

В последнее время я пытаюсь создать демонстрацию потокового вещания в реальном времени, используя NiFi -> Kafka -> Druid -> Superset. Для целей этой демонстрации я выбрал API-интерфейс RecentChanges * Wikipedia для получения асинхронных данных о последних изменениях.

Я использую этот URL , чтобы получить ответ об изменениях. Я вызываю постоянство API, чтобы не пропустить какие-либо изменения. Таким образом, я получаю много дубликатов, которые мне не нужны.

Есть ли способ параметризации этого API, чтобы исправить его, например, получить все изменения за предыдущую секунду и делать это каждую секунду или что-то еще для решения этой проблемы? вопрос. Я пытаюсь создать конфигурацию для этого NiFi, если кто-то должен что-то добавить к этой части, тогда посетите это обсуждение на Cloudera .

Ответы [ 2 ]

2 голосов
/ 23 октября 2019

Да. См. https://en.wikipedia.org/w/api.php?action=help&modules=query%2Brecentchanges Используйте rcstart и rcend для определения времени начала и окончания. Вы можете использовать «сейчас» для rcend.

1 голос
/ 25 октября 2019

Я хочу расширить ответ smartse и найти решение. Вы хотите поместить свой запрос API в определенные временные окна, смещая начальный и конечный параметры. Работа с окнами может работать следующим образом:

  • Инициализировать параметры начальной, конечной отметки времени
  • Поместить эти параметры в качестве атрибутов в поток
  • Нижестоящие процессоры могут вызывать API, используя эти параметры
  • После этого вы должны установить start = previous_end + 1 секунду и end = now

Когда вы определяете новое окно для следующего запуска, вам нужны параметры изпредыдущий прогон. Вот почему вы должны помнить эти ценности. Вы можете добиться этого, используя распределенный кэш карт NiFi.

Я собрал для вас поток:

enter image description here

Увеличение Get next date range:

enter image description here

Конечный параметр всегда теперь, так что вам просто нужно сохранить начальный параметр. FetchDistributedMapCache выберет это для вас и поместит в атрибут stored.state:

enter image description here

Set time range процессор инициализирует параметры:

enter image description here

Обратите внимание, что конец всегда наступает, и начало - это либо начальная дата (для первого запуска), либо последний параметр окончания плюс 1 секунда. В этот момент поток направляется в вывод Time range, где вы можете вызвать свой API в нисходящем направлении. Кроме того, вы должны обновить хранимое значение. Это происходит в ReplaceText процессоре:

enter image description here

Наконец вы обновляете состояние:

enter image description here

Жизненный цикл параметров привязан к идентификатору кэша. Когда вы меняете идентификатор, вы начинаете с нуля.

...