Ошибка при создании этого образца потока данных GCP для потоковой передачи от Pubsub к Bigquery - PullRequest
2 голосов
/ 12 апреля 2019

Я пытаюсь построить следующий пример потоковой передачи Pub / Sub на BigQuery:

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java

код:

<code>/*
 * Copyright (C) 2018 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.cloud.teleport.templates;

import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format
 * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors
 * which occur in the transformation of the data or execution of the UDF will be output to a
 * separate errors table in BigQuery. The errors table will be created if it does not exist prior to
 * execution. Both output and error tables are specified by the user as template parameters.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The Pub/Sub topic exists.
 *   <li>The BigQuery output table exists.
 * </ul>
 *
 * <p><b>Example Usage</b>
 *
 * <pre>
 * # Set the pipeline vars
 * PROJECT_ID=PROJECT ID HERE
 * BUCKET_NAME=BUCKET NAME HERE
 * PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
 * USE_SUBSCRIPTION=true or false depending on whether the pipeline should read
 *                  from a Pub/Sub Subscription or a Pub/Sub Topic.
 *
 * # Set the runner
 * RUNNER=DataflowRunner
 *
 * # Build the template
 * mvn compile exec:java \
 * -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
 * -Dexec.cleanupDaemonThreads=false \
 * -Dexec.args=" \
 * --project=${PROJECT_ID} \
 * --stagingLocation=${PIPELINE_FOLDER}/staging \
 * --tempLocation=${PIPELINE_FOLDER}/temp \
 * --templateLocation=${PIPELINE_FOLDER}/template \
 * --runner=${RUNNER}
 * --useSubscription=${USE_SUBSCRIPTION}
 * "
 *
 * # Execute the template
 * JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`
 *
 * # Execute a pipeline to read from a Topic.
 * gcloud dataflow jobs run ${JOB_NAME} \
 * --gcs-location=${PIPELINE_FOLDER}/template \
 * --zone=us-east1-d \
 * --parameters \
 * "inputTopic=projects/${PROJECT_ID}/topics/input-topic-name,\
 * outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
 * outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
 *
 * # Execute a pipeline to read from a Subscription.
 * gcloud dataflow jobs run ${JOB_NAME} \
 * --gcs-location=${PIPELINE_FOLDER}/template \
 * --zone=us-east1-d \
 * --parameters \
 * "inputSubscription=projects/${PROJECT_ID}/subscriptions/input-subscription-name,\
 * outputTableSpec=${PROJECT_ID}:dataset-id.output-table,\
 * outputDeadletterTable=${PROJECT_ID}:dataset-id.deadletter-table"
 * 
* / открытый класс PubSubToBigQuery { / ** Журнал для вывода сообщений о состоянии. * / приватная статическая финальная Logger LOG = LoggerFactory.getLogger (PubSubToBigQuery.class); / ** Тег основного выхода для UDF. * / public static final TupleTag > UDF_OUT = new TupleTag > () {}; / ** Тег для основного вывода преобразования json. * / public static final TupleTag TRANSFORM_OUT = new TupleTag () {}; / ** Тег для вывода мертвых букв из udf. * / public static final TupleTag > UDF_DEADLETTER_OUT = new TupleTag > () {}; / ** Тег для вывода мертвой буквы преобразования строки json в таблицу. * / public static final TupleTag > TRANSFORM_DEADLETTER_OUT = new TupleTag > () {}; / ** Суффикс по умолчанию для таблиц ошибок, если таблица мертвых букв не указана. * / public static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records"; / ** Pubsub сообщение / строковый кодер для конвейера. * / public static final FailsafeElementCoder CODER = FailsafeElementCoder.of (PubsubMessageWithAttributesCoder.of (), StringUtf8Coder.of ()); / ** String / String Coder для FailsafeElement. * / public static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER = FailsafeElementCoder.of (StringUtf8Coder.of (), StringUtf8Coder.of ()); / ** * Класс {@link Options} предоставляет пользовательские параметры выполнения, передаваемые исполнителем на * командная строка. * / Параметры открытого интерфейса расширяют PipelineOptions, JavascriptTextTransformerOptions { @Description («Спецификация таблицы для записи вывода в») ValueProvider getOutputTableSpec (); void setOutputTableSpec (ValueProvider value); @Description («Pub / Sub topic для чтения входных данных») ValueProvider getInputTopic (); void setInputTopic (ValueProvider value); @Описание( «Облачная публикация / подписка на подписку.» + «Название должно быть в формате» + "projects / <идентификатор проекта> / подписки / <имя подписки>.") ValueProvider getInputSubscription (); void setInputSubscription (ValueProvider value); @Описание( «Это определяет, будет ли шаблон читать из« + »подписку на паб / подписку или тему») @ Default.Boolean (ложь) Boolean getUseSubscription (); void setUseSubscription (логическое значение); @Описание( "Таблица недоставленных сообщений для вывода в BigQuery в <идентификатор проекта>: .

Ответы [ 2 ]

5 голосов
/ 12 апреля 2019

Класс, который вы пытаетесь создать, не может быть создан только из этого файла. Он ссылается на несколько других классов, которые появляются в самом хранилище, например, com.google.cloud.teleport.coders . В основных инструкциях репо говорится, что нужно построить весь проект с помощью команды mvn clean compile. Затем в инструкциях указывается команда, необходимая для создания и создания файла шаблона. Если вы собираетесь извлекать один из шаблонов отдельно, вам нужно будет включить внешние зависимости в файл pom, а также извлечь локальные зависимости, которые они построили. Операторы import должны указывать зависимости. Те, кто в com.google.cloud.teleport, все в этом же репо . Остальные ссылки будут указаны в основном pom.xml .

1 голос
/ 12 апреля 2019

Проверьте pom.xml из DataflowTemplates Github , похоже, что вам не хватает некоторых зависимостей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...