Поток данных читается из topi c PubSub и записывается в Bigquery (несколько таблиц) - PullRequest
0 голосов
/ 18 марта 2020

есть кто-то, кто использовал DynamicDestination в Dataflow, у которого есть простой и описанный пример. Мне надоело видеть пример телепорта в git (https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/DLPTextToBigQueryStreaming.java), мне больно быть новичком в apache Beam. Кстати, мне нужно прочитать сообщение из Pubsub и через задание потока данных записать в разные места назначения (таблицы) в наборе данных BigQuery. У меня есть собственный проект, который идеально подходит для таблицы Bigquery, но Pubsub topi c будет содержать несколько пунктов назначения из одного и того же набора данных. Кроме того, сообщение имеет формат JSON и содержит поле с именем таблицы назначения.

Это мой наиболее представительный код

TopicToBigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(TopicToBigQueryOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(Constants.READ_PUBSUB, PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
         .apply(Constants.LINE_TO_CHAMP, new PubSubToTableRowTransform())
         .apply(Constants.WRITE_CHAMPBAN, BigQueryIO.writeTableRows()
                .to(options.getTableStagingFileLines())
                .withSchema(AmplaChangeLogSchema.getTableSchema())
                .withCreateDisposition(CREATE_IF_NEEDED)
             .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Любое предложение?

С наилучшими пожеланиями

1 Ответ

0 голосов
/ 19 марта 2020

Как я уже упоминал в комментарии, первоначальный автор ( @ Ryan McDowell ) объясняет почти тот же сценарий пользователя, потребляя полезные нагрузки JSON из GCP Pub / Sub очередь сообщений, динамическая маршрутизация c к таблицам Bigquery, извлечение определенного имени таблицы в указанном атрибуте c из сообщения Pub / Sub.

В конвейере из примера мы видим метод getTableDestination(), унаследованный от DynamicDestination класса, который используется для извлечения определенного атрибута (tableNameAttr) из в сообщении, которое содержит имя таблицы Bigquery, наконец идентифицируя целевой объект TableDestination () .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...