Google Dataflow и Pubsub - не может быть достигнута однократная доставка - PullRequest
0 голосов
/ 20 сентября 2018

Я пытаюсь добиться точной однократной доставки с использованием Google Dataflow и PubSub с использованием Apache Beam SDK 2.6.0.

Вариант использования довольно прост:

отправляет задание потока данных «Генератор»Сообщения 1M в тему PubSub.

GenerateSequence
          .from(0)
          .to(1000000)
          .withRate(100000, Duration.standardSeconds(1L));

Задание потока данных «Архив» считывает сообщения из подписки PubSub и сохраняет их в облачном хранилище Google.

pipeline
        .apply("Read events",
            PubsubIO.readMessagesWithAttributes()
                // this is to achieve exactly-once delivery
                .withIdAttribute(ATTRIBUTE_ID)
                .fromSubscription('subscription')
                .withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
        .apply("Window events",
            Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                .withAllowedLateness(Duration.standardMinutes(15))
                .discardingFiredPanes())
        .apply("Events count metric", ParDo.of(new CountMessagesMetric()))
        .apply("Write files to archive",
            FileIO.<String, Dto>writeDynamic()
                .by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
                .to(archiveDir)
                .withTempDirectory(archiveDir)
                .withNumShards(options.getNumShards())
                .withNaming(dataSource ->
                    new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
                ));

Я добавил «withIdAttribute» в оба Pubsub.IO.Write (задание 'Generator') и PubsubIO.Read (задание 'Archive') и ожидаем, что он будет гарантировать семантику ровно один раз.

Я хотел бы протестировать сценарий "минус":

  1. Задание потока данных «Генератор» отправляет сообщения 1M в тему PubSub.
  2. Задание потока данных «Архив» начинает работать, но я останавливаю его в середине обработки, нажимая «Остановить задание» -> ».Слив.Некоторая часть сообщений была обработана и сохранена в облачном хранилище, скажем, 400K сообщений.
  3. Я снова запускаю задание «Архивировать» и ожидаю, что он примет необработанные сообщения (600 КБ), и в итоге я увижу ровно 1Mсообщения сохраняются в хранилище.

Что я получил на самом деле - все сообщения доставляются (по крайней мере, один раз достигается), но помимо этого есть много дубликатов - что-то по соседству30-50 КБ на 1М сообщений.

Есть ли какое-либо решение для доставки точно однократной доставки?

Ответы [ 2 ]

0 голосов
/ 25 апреля 2019

Поток данных не позволяет сохранять состояние при каждом запуске.Если вы используете Java, вы можете обновить работающий конвейер таким образом, чтобы он не потерял существующее состояние, позволяя выполнять дедупликацию между выпусками конвейера.

Если это не работаетдля вас вы можете архивировать сообщения таким образом, чтобы они кодировались ATTRIBUTE_ID, например. Spanner или GCS, используя это как имя файла.

0 голосов
/ 20 сентября 2018

Итак, я никогда не делал это сам, но рассуждая о вашей проблеме, я бы так к ней подходил ...

Мое решение немного запутанное, но мне не удалось найти другие способы достиженияэто без привлечения других внешних служб.Итак, здесь ничего не идет.

Вы могли бы иметь свой конвейер, считывающий как из pubsub, так и из GCS, а затем объединить их для дедупликации данных.Сложность в том, что один будет ограниченной коллекцией pCollection (GCS), а другой - неограниченной (pubsub).Вы можете добавить временные метки в ограниченную коллекцию и затем отобразить данные.На этом этапе вы можете отбросить данные GCS старше ~ 15 минут (продолжительность окна в вашей предыдущей реализации).Эти два шага (т.е. правильное добавление временных меток и удаление данных, которые, вероятно, достаточно стары, чтобы не создавать дубликаты), безусловно, являются самыми хитрыми частями.

Как только это будет решено, добавьте две pCollections и затем используйте GroupByKey для идентификатора, который является общим для обоих наборов данных.Это даст PCollection<KV<Long, Iterable<YOUR_DATUM_TYPE>>.Затем вы можете использовать дополнительный DoFn, который удаляет все, кроме первого элемента в результирующем Iterable, а также удаляет бокс KV <>.После этого вы можете просто продолжить обработку данных, как обычно.

Наконец, эта дополнительная работа должна быть необходима только для первого окна pubsub при перезапуске конвейера.После этого вам следует переназначить pCollection GCS для пустой коллекции pCollection, чтобы группа по ключу не выполняла слишком много дополнительной работы.

Дайте мне знать, что вы думаете, и может ли это сработать.Кроме того, если вы решили придерживаться этой стратегии, пожалуйста, оставьте свой пробег :).

...