Дублирует входные данные Apache Beam / Dataflow даже при использовании withIdAttribute - PullRequest
0 голосов
/ 28 мая 2018

Я пытаюсь вставить данные из стороннего API в конвейер потока данных.Так как сторонние поставщики не предоставляют доступ к веб-сайтам, я написал собственный скрипт, который постоянно опрашивает их конечную точку для получения дополнительных данных.

Данные обновляются каждые 15 минут, но так как я не хочу пропускать точки данныхи я хочу потреблять, как только появляются новые данные, мой «сканер» запускается каждую 1 минуту.Затем скрипт отправляет данные в тему PubSub.Легко видеть, что PubSub будет получать около 15 повторных сообщений для каждой точки данных в источнике.

Моя первая попытка идентифицировать и отбросить эти повторные сообщения состояла в том, чтобы добавить пользовательский атрибут к каждому сообщению PubSub (eventid),создан из хеша его [ID + updated_time] в источнике.

const attributes = {
         eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
         timestamp: item.timestamp.toString()
      };
const dataBuffer = Buffer.from(JSON.stringify(item))
publisher.publish(dataBuffer, attributes)

Затем я настроил поток данных с withIdAttribute() (который является новым idLabel(), основанным на идентификаторы записи ).

PCollection<String> input = p
    .apply("ReadFromPubSub", PubsubIO
       .readStrings()
       .fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
       .withTimestampAttribute("timestamp")
       .withIdAttribute("eventid"))
   .apply("OutputToBigQuery", ...)

С этой реализацией я ожидал, что когда сценарий отправит один и тот же объект данных во второй раз, повторный eventid будет таким же, а сообщение отброшено,Но по какой-то причине я все еще вижу дубликаты в выходном наборе данных.

Некоторые вопросы:

  1. Есть ли умный способ ввода данных в поток данных из стороннего API, если они не предоставляют веб-хуков?
  2. Есть идеи?почему поток данных не отбрасывает сообщения об этой ситуации?
    • Я знаю о 10-минутном ограничении дедупликации в потоке данных, но я вижу дублированные данные даже при 2-й вставке (2 минуты).

ЛюбойПомощь будет принята с благодарностью!

1 Ответ

0 голосов
/ 17 июля 2018

Я думаю, что вы на правильном пути, вместо хэша, который я рекомендую использовать временные метки.Лучший способ сделать это - использовать Windows.Просмотрите этот документ , который фильтрует данные, находящиеся за пределами окна.

Что касается дополнительных дублирующих данных, если вы используете подписки по запросу, и срок обработки подтверждения истекает до обработки данных, сообщение будет повторно отправлено в соответствии с как минимум однократной доставкой .В этом случае измените крайний срок подтверждения, значения по умолчанию - 10 секунд.

...