Сейчас я пытаюсь использовать приведенный ниже пример для извлечения данных из GCP Pub / Sub в DataFlow.
https://codelabs.developers.google.com/codelabs/cpb101-bigquery-dataflow-streaming/index.html?index=..%2F..%2Fnext17#5
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import avro.shaded.com.google.common.collect.Lists;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
public class StreamDemoConsumer {
public static interface MyOptions extends DataflowPipelineOptions {
@Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")
@Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2")
String getOutput();
void setOutput(String s);
@Description("Input topic")
@Default.String("projects/coexon-seoul-dev/topics/trading")
String getInput();
void setInput(String s);
}
@SuppressWarnings("serial")
public static void main(String[] args) throws IOException {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
String topic = options.getInput();
String output = options.getOutput();
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("num_words").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
p //
.apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
.apply("window",
Window.into(SlidingWindows//
.of(Duration.standardMinutes(2))//
.every(Duration.standardSeconds(30)))) //
.apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
c.output(line.split(" ").length);
}
}))//
.apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) //
.apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
row.set("timestamp", Instant.now().toString());
row.set("num_words", c.element());
c.output(row);
}
})) //
.apply(BigQueryIO.writeTableRows().to(output)//
.withSchema(schema)//
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
}
}
Я запускаю этот код, используя следующую команду.
sh run_oncloud4.sh coexon-seoul-dev ledgerbucket
тогда код работает хорошо
run_oncloud4.sh как показано ниже
#!/bin/bash
if [ "$#" -ne 2 ]; then
echo "Usage: ./run_oncloud.sh project-name bucket-name"
echo "Example: ./run_oncloud.sh cloud-training-demos cloud-training-demos"
exit
fi
PROJECT=$1
BUCKET=$2
MAIN=com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer
echo "project=$PROJECT bucket=$BUCKET main=$MAIN"
export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java \
-Dexec.mainClass=$MAIN \
-Dexec.args="--project=$PROJECT \
--stagingLocation=gs://$BUCKET/staging/ \
--tempLocation=gs://$BUCKET/staging/ \
--output=$PROJECT:demos.streamdemo \
--input=projects/$PROJECT/topics/streamdemo \
--runner=DataflowRunner"
, но я запускаю верхний код, как показано ниже
sh run_locally.sh com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer
, затем невозможнополучено сообщение об ошибке учетных данных приложения по умолчанию.
SLF4J: Не удалось загрузить класс "org.slf4j.impl.StaticLoggerBinder".SLF4J: По умолчанию используется логгер без операций (NOP). SLF4J: Подробнее см. http://www.slf4j.org/codes.html#StaticLoggerBinder.Исключение в потоке "main" java.lang.RuntimeException: невозможно получить учетные данные приложения по умолчанию.Пожалуйста, смотрите https://developers.google.com/accounts/docs/application-default-credentials для получения подробной информации о том, как указать учетные данные.Эта версия SDK зависит от версии базового компонента gcloud 2015.02.05 или более поздней, чтобы иметь возможность получать учетные данные от авторизованного пользователя через gcloud auth.по адресу org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer.throwNullCredentialException (NullCredentialInitializer.java:60) по адресу org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitialShindalHullIllSenseHullIllSenseHullIllSenseRuND53) на com.google.cloud.hadoop.util.ChainingHttpRequestInitializer $ 3.handleResponse (ChainingHttpRequestInitializer.java:111) на com.google.api.client.http.HttpRequest.execute (HttpRequest.java.g0).api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed (AbstractGoogleClientRequest.java:419) в com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed (AbstractGap.li.jpg).googleapis.services..gcp.bigquery. BigQueryServicesImpl.io.gcp.bigquery.BigQueryIO $ Write.validate (BigQueryIO.java:1486) в org.apache.beam.sdk.Pipeline $ ValidateVisitor.enterCompositeTransform (Pipeline.java:640) в org.apache.beam.sdk.runners.TransformHierarchy $ Node.visit (TransformHierarchy.java:656) в org.apache.beam.sdk.runners.TransformHierarchy $ Node.visit (TransformHierarchy.java:660) в org.apache.beam.sdk.runners.TransformHierarchy $ Node.access $ 600 (TransformHierarchy.java:311) в org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:245) в org.apache.beam.sdk.Pipeline.traverseTopological (Pipeline.java:458).в org.apache.beam.sdk.Pipeline.validate (Pipeline.java:575) в org.apache.beam.sdk.Pipeline.run (Pipeline.java:310) в org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) на com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer.main (StreamDemoConsumer.java:115)
Процесс завершен с кодом выхода 1
run_locally.sh
#!/bin/bash
if [ "$#" -ne 1 ]; then
echo "Usage: ./run_locally.sh mainclass-basename"
echo "Example: ./run_oncloud.sh Grep"
exit
fi
MAIN=com.google.cloud.training.dataanalyst.javahelp.$1
export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java -Dexec.mainClass=$MAIN
Iустановил учетные данные
echo ${GOOGLE_APPLICATION_CREDENTIALS}
/ Users / mattheu / coexon-seoul-dev-898d91a66539.json
, но произошла ошибка авторизации.
как я могу решить эту проблему?