Как распространять метаданные PubSub с помощью Apache Beam? - PullRequest
0 голосов
/ 10 января 2020

Контекст: у меня есть конвейер, который слушает pub sub, сообщение pubsub публикуется с помощью уведомления об изменении объекта из облачного хранилища Google. Конвейер обрабатывает файл, используя XmlIO, разделяя его, пока все хорошо.

Проблема заключается в следующем: в сообщении pubsub (и в объекте, хранящемся в облачном хранилище Google) у меня есть некоторые метаданные, которые я хотел бы объединить с данными из XmlIO, чтобы составить элементы, которые будет конвейер процесс, как я могу этого достичь?

Ответы [ 2 ]

1 голос
/ 13 января 2020

Вы можете создать собственное окно и windowfn, в котором хранятся метаданные из сообщения pubsub, которое вы хотите использовать позже для обогащения отдельных записей.

Ваш конвейер будет выглядеть следующим образом:

ReadFromPubsub -> Window.into(CopyMetadataToCustomWindowFn) -> ParDo(ExtractFilenameFromPubsubMessage) -> XmlIO -> ParDo(EnrichRecordsWithWindowMetadata) -> Window.into(FixedWindows.of(...))

Для начала вам нужно создать подкласс IntervalWindow , в котором хранятся нужные вам метаданные. После этого создайте подкласс WindowFn , где в # assign Windows (...) вы копируете метаданные из сообщения pubsub в созданный вами подкласс IntervalWindow. Примените ваш новый windowfn, используя преобразование Window.into (...) . Теперь каждая из записей, которые проходят через преобразование XmlIO, будет в вашем пользовательском windowfn, который содержит метаданные.

Для второго шага вам нужно извлечь соответствующее имя файла из сообщения pubsub для передачи в Преобразование XmlIO в качестве входных данных.

На третьем этапе вы хотите извлечь пользовательские метаданные из окна в ParDo / DoFn, который следует за XmlIO. Записи в XmlIO сохранят информацию о окнах, которая была передана через него (обратите внимание, что не все преобразования делают это, но почти все делают). Вы можете заявить, что вашему DoFn нужно, чтобы окно было передано вашему @ ProcessElement , например:

class EnrichRecordsWithWindowMetadata extends DoFn<...> {
  @ProcessElement
  public void processElement(@Element XmlRecord xmlRecord, MyCustomMetadataWindow metadataWindow) {
    ... enrich record with metadata on window ...
  }
}

Наконец, хорошая идея вернуться к одному из стандартных windowfns например, Исправлено Windows, поскольку метаданные в окне больше не актуальны.

1 голос
/ 10 января 2020

Вы можете использовать прямое паб / суб-уведомление из облачного хранилища Google вместо введения OCN в середине.

Google также предлагают использовать pub / sub. Если вы получили уведомление о публикации / подписке, вы можете получить в нем атрибуты сообщения.

data = request.get_json()

object_id = data['message']['attributes']['objectGeneration']
bucket_name = data['message']['attributes']['bucketId']
object_name = data['message']['attributes']['objectId']
...