Я пытаюсь построить следующий пример потоковой передачи Pub / Sub на BigQuery:
https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java
код:
<code>/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
* from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
* which occur in the transformation of the data or execution of the UDF will be output to a
* separate errors table in BigQuery. The errors table will be created if it does not exist prior to
* execution. Both output and error tables are specified by the user as template parameters.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The Pub/Sub topic exists.
* <li>The BigQuery output table exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* BUCKET_NAME=BUCKET NAME HERE
* PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
* USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
* from a Pub/Sub Subscription or a Pub/Sub Topic.
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}
* --useSubscription=${USE_SUBSCRIPTION}
* "
*
* # Execute the template
* JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* # Execute a pipeline to read from a Topic.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* # Execute a pipeline to read from a Subscription.
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
* outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
* outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
*
* /
открытый класс PubSubToBigQuery {
/ ** Журнал для вывода сообщений о состоянии. * /
приватная статическая финальная Logger LOG = LoggerFactory.getLogger (PubSubToBigQuery.class);
/ ** Тег основного выхода для UDF. * /
public static final TupleTag > UDF_OUT =
new TupleTag > () {};
/ ** Тег для основного вывода преобразования json. * /
public static final TupleTag TRANSFORM_OUT = new TupleTag () {};
/ ** Тег для вывода мертвых букв из udf. * /
public static final TupleTag > UDF_DEADLETTER_OUT =
new TupleTag > () {};
/ ** Тег для вывода мертвой буквы преобразования строки json в таблицу. * /
public static final TupleTag > TRANSFORM_DEADLETTER_OUT =
new TupleTag > () {};
/ ** Суффикс по умолчанию для таблиц ошибок, если таблица мертвых букв не указана. * /
public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
/ ** Pubsub сообщение / строковый кодер для конвейера. * /
public static final FailsafeElementCoder CODER =
FailsafeElementCoder.of (PubsubMessageWithAttributesCoder.of (), StringUtf8Coder.of ());
/ ** String / String Coder для FailsafeElement. * /
public static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER =
FailsafeElementCoder.of (StringUtf8Coder.of (), StringUtf8Coder.of ());
/ **
* Класс {@link Options} предоставляет пользовательские параметры выполнения, передаваемые исполнителем на
* командная строка.
* /
Параметры открытого интерфейса расширяют PipelineOptions, JavascriptTextTransformerOptions {
@Description («Спецификация таблицы для записи вывода в»)
ValueProvider getOutputTableSpec ();
void setOutputTableSpec (ValueProvider value);
@Description («Pub / Sub topic для чтения входных данных»)
ValueProvider getInputTopic ();
void setInputTopic (ValueProvider value);
@Описание(
«Облачная публикация / подписка на подписку.»
+ «Название должно быть в формате»
+ "projects / <идентификатор проекта> / подписки / <имя подписки>.")
ValueProvider getInputSubscription ();
void setInputSubscription (ValueProvider value);
@Описание(
«Это определяет, будет ли шаблон читать из« + »подписку на паб / подписку или тему»)
@ Default.Boolean (ложь)
Boolean getUseSubscription ();
void setUseSubscription (логическое значение);
@Описание(
"Таблица недоставленных сообщений для вывода в BigQuery в <идентификатор проекта>: .