Я пытаюсь вставить данные из стороннего API в конвейер потока данных.Так как сторонние поставщики не предоставляют доступ к веб-сайтам, я написал собственный скрипт, который постоянно опрашивает их конечную точку для получения дополнительных данных.
Данные обновляются каждые 15 минут, но так как я не хочу пропускать точки данныхи я хочу потреблять, как только появляются новые данные, мой «сканер» запускается каждую 1 минуту.Затем скрипт отправляет данные в тему PubSub.Легко видеть, что PubSub будет получать около 15 повторных сообщений для каждой точки данных в источнике.
Моя первая попытка идентифицировать и отбросить эти повторные сообщения состояла в том, чтобы добавить пользовательский атрибут к каждому сообщению PubSub (eventid
),создан из хеша его [ID + updated_time] в источнике.
const attributes = {
eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
timestamp: item.timestamp.toString()
};
const dataBuffer = Buffer.from(JSON.stringify(item))
publisher.publish(dataBuffer, attributes)
Затем я настроил поток данных с withIdAttribute()
(который является новым idLabel()
, основанным на идентификаторы записи ).
PCollection<String> input = p
.apply("ReadFromPubSub", PubsubIO
.readStrings()
.fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
.withTimestampAttribute("timestamp")
.withIdAttribute("eventid"))
.apply("OutputToBigQuery", ...)
С этой реализацией я ожидал, что когда сценарий отправит один и тот же объект данных во второй раз, повторный eventid
будет таким же, а сообщение отброшено,Но по какой-то причине я все еще вижу дубликаты в выходном наборе данных.
Некоторые вопросы:
- Есть ли умный способ ввода данных в поток данных из стороннего API, если они не предоставляют веб-хуков?
- Есть идеи?почему поток данных не отбрасывает сообщения об этой ситуации?
- Я знаю о 10-минутном ограничении дедупликации в потоке данных, но я вижу дублированные данные даже при 2-й вставке (2 минуты).
ЛюбойПомощь будет принята с благодарностью!