Невозможно получить учетные данные приложения по умолчанию.бежать на месте - PullRequest
0 голосов
/ 27 сентября 2018

Сейчас я пытаюсь использовать приведенный ниже пример для извлечения данных из GCP Pub / Sub в DataFlow.

https://codelabs.developers.google.com/codelabs/cpb101-bigquery-dataflow-streaming/index.html?index=..%2F..%2Fnext17#5

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import avro.shaded.com.google.common.collect.Lists;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

public class StreamDemoConsumer {

    public static interface MyOptions extends DataflowPipelineOptions {
        @Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")
        @Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2")
        String getOutput();

        void setOutput(String s);

        @Description("Input topic")
        @Default.String("projects/coexon-seoul-dev/topics/trading")
        String getInput();

        void setInput(String s);
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) throws IOException {

        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

        options.setStreaming(true);
        Pipeline p = Pipeline.create(options);


        String topic = options.getInput();
        String output = options.getOutput();

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
        fields.add(new TableFieldSchema().setName("num_words").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);

        p //
                .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
                .apply("window",
                        Window.into(SlidingWindows//
                                .of(Duration.standardMinutes(2))//
                                .every(Duration.standardSeconds(30)))) //
                .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        String line = c.element();
                        c.output(line.split(" ").length);
                    }
                }))//
                .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) //
                .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        TableRow row = new TableRow();
                        row.set("timestamp", Instant.now().toString());
                        row.set("num_words", c.element());
                        c.output(row);
                    }
                })) //
                .apply(BigQueryIO.writeTableRows().to(output)//
                        .withSchema(schema)//
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

        p.run();
    }
}

Я запускаю этот код, используя следующую команду.

sh run_oncloud4.sh coexon-seoul-dev ledgerbucket

тогда код работает хорошо

run_oncloud4.sh как показано ниже

#!/bin/bash

if [ "$#" -ne 2 ]; then
   echo "Usage:   ./run_oncloud.sh project-name  bucket-name"
   echo "Example: ./run_oncloud.sh cloud-training-demos  cloud-training-demos"
   exit
fi

PROJECT=$1
BUCKET=$2
MAIN=com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer

echo "project=$PROJECT  bucket=$BUCKET  main=$MAIN"

export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java \
 -Dexec.mainClass=$MAIN \
      -Dexec.args="--project=$PROJECT \
      --stagingLocation=gs://$BUCKET/staging/ \
      --tempLocation=gs://$BUCKET/staging/ \
      --output=$PROJECT:demos.streamdemo \
      --input=projects/$PROJECT/topics/streamdemo \
      --runner=DataflowRunner"

, но я запускаю верхний код, как показано ниже

sh run_locally.sh com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer

, затем невозможнополучено сообщение об ошибке учетных данных приложения по умолчанию.

SLF4J: Не удалось загрузить класс "org.slf4j.impl.StaticLoggerBinder".SLF4J: По умолчанию используется логгер без операций (NOP). SLF4J: Подробнее см. http://www.slf4j.org/codes.html#StaticLoggerBinder.Исключение в потоке "main" java.lang.RuntimeException: невозможно получить учетные данные приложения по умолчанию.Пожалуйста, смотрите https://developers.google.com/accounts/docs/application-default-credentials для получения подробной информации о том, как указать учетные данные.Эта версия SDK зависит от версии базового компонента gcloud 2015.02.05 или более поздней, чтобы иметь возможность получать учетные данные от авторизованного пользователя через gcloud auth.по адресу org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer.throwNullCredentialException (NullCredentialInitializer.java:60) по адресу org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitialShindalHullIllSenseHullIllSenseHullIllSenseRuND53) на com.google.cloud.hadoop.util.ChainingHttpRequestInitializer $ 3.handleResponse (ChainingHttpRequestInitializer.java:111) на com.google.api.client.http.HttpRequest.execute (HttpRequest.java.g0).api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed (AbstractGoogleClientRequest.java:419) в com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed (AbstractGap.li.jpg).googleapis.services..gcp.bigquery. BigQueryServicesImpl.io.gcp.bigquery.BigQueryIO $ Write.validate (BigQueryIO.java:1486) в org.apache.beam.sdk.Pipeline $ ValidateVisitor.enterCompositeTransform (Pipeline.java:640) в org.apache.beam.sdk.runners.TransformHierarchy $ Node.visit (TransformHierarchy.java:656) в org.apache.beam.sdk.runners.TransformHierarchy $ Node.visit (TransformHierarchy.java:660) в org.apache.beam.sdk.runners.TransformHierarchy $ Node.access $ 600 (TransformHierarchy.java:311) в org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:245) в org.apache.beam.sdk.Pipeline.traverseTopological (Pipeline.java:458).в org.apache.beam.sdk.Pipeline.validate (Pipeline.java:575) в org.apache.beam.sdk.Pipeline.run (Pipeline.java:310) в org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) на com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer.main (StreamDemoConsumer.java:115)

Процесс завершен с кодом выхода 1

run_locally.sh

#!/bin/bash

if [ "$#" -ne 1 ]; then
   echo "Usage:   ./run_locally.sh mainclass-basename"
   echo "Example: ./run_oncloud.sh Grep"
   exit
fi

MAIN=com.google.cloud.training.dataanalyst.javahelp.$1

export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java -Dexec.mainClass=$MAIN

Iустановил учетные данные

echo ${GOOGLE_APPLICATION_CREDENTIALS}

/ Users / mattheu / coexon-seoul-dev-898d91a66539.json

, но произошла ошибка авторизации.

как я могу решить эту проблему?

...