Как Dataflow работает с набором данных BIgQuery - PullRequest
0 голосов
/ 29 июня 2018

Я не нашел, как получить таблицы из указанного набора данных. Я хочу использовать Dataflow для переноса таблиц, начиная с набора данных США и заканчивая расположением набора данных ЕС. Я хотел бы получить все таблицы в параллельном процессе набора данных US и записать таблицы в набор данных EU.

Beam 2.4 использует com.google.api.services.bigquery v2-rev374-1.22.0. Это также библиотека, которую вы должны использовать с Beam 2.4.

Код успешно работает с DirectRunner, но если я запускаю с DataflowRunner не запускается и выдает ошибку

un 29, 2018 1:52:48 PM com.google.api.client.http.HttpRequest execute
ADVERTENCIA: exception thrown while executing request
java.net.SocketException: Unexpected end of file from server
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
        at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)
        at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)
        at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

jun 29, 2018 1:52:55 PM com.google.api.client.http.HttpRequest execute
ADVERTENCIA: exception thrown while executing request
java.net.SocketException: Unexpected end of file from server
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
        at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)
        at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)
        at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

пример кода

package emp.customerjourney.etls;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.*;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.*;


import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;


import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class Migracion {

    public static interface MyOptions extends DataflowPipelineOptions {
        @Description("BigQuery table to write to, specified as "+ "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
        @Default.String("customerjourney:prueba.weather_stations")
        @Validation.Required
        String getOutput();
        void setOutput(String s);

        @Description("Table to read from, specified as "+ "<project_id>:<dataset_id>.<table_id>")
        @Default.String("customerjourney:118678548.gsod3")
        String getInput();
        void setInput(String value);


    }


    public static Bigquery createAuthorizedClient() throws IOException {
        // Create the credential
        HttpTransport transport = new NetHttpTransport();
        JsonFactory jsonFactory = new JacksonFactory();
        GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);

        if (credential.createScopedRequired()) {
            credential = credential.createScoped(BigqueryScopes.all());
        }

        return new Bigquery.Builder(transport, jsonFactory, credential)
                .setApplicationName("Bigquery Samples")
                .build();
    }


    public static final void main(String args[]) throws Exception {


        String projectId = "customerjourney";
        String datasetName = "dsorigen";
        // Create a new Bigquery client authorized via Application Default Credentials.
        Bigquery bigquery = createAuthorizedClient();
        Bigquery.Tables.List lista=bigquery.tables().list(projectId,datasetName);
        TableList rp= lista.execute();
        List<TableList.Tables> tblista =rp.getTables();
        String  entrada=tblista.get(3).getId();

        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

        options.setTempLocation("gs://pruebasg/teststaging");
        options.setRegion("europe-west1");
        options.setStagingLocation("gs://pruebasg/temp_dataflow_tasks");
        Pipeline p = Pipeline.create(options);

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);

       // p.apply(BigQueryIO.readTableRows().from(options.getInput()))
        p.apply(BigQueryIO.readTableRows().from(entrada)) //get dataset name form api Bigquery V2
                .apply(new BigQueryTornadoes.CountTornadoes())
                .apply(BigQueryIO.writeTableRows()
                        .to(options.getOutput())
                        .withSchema(schema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));



        p.run().waitUntilFinish();

        options.getExecutorService().shutdown();
        try {
            options.getExecutorService().awaitTermination(3, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            System.out.println("Thread was interrupted waiting for execution service to shutdown.");
        }
        System.out.println("termino");

    }

}

<dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.dataflow</groupId>
            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
            <version>[2.4.0, 2.99)</version>
        </dependency>

        <!-- slf4j API frontend binding with JUL backend -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.14</version>
        </dependency>

       <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>1.7.14</version>
        </dependency>

     <dependency>
            <groupId>com.google.apis</groupId>
            <artifactId>google-api-services-bigquery</artifactId>
            <version>${bigquery.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-jdk5</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>22.0</version>
        </dependency>

              <dependency>
                  <groupId>com.google.oauth-client</groupId>
                  <artifactId>google-oauth-client</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.http-client</groupId>
                  <artifactId>google-http-client-jackson2</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.oauth-client</groupId>
                  <artifactId>google-oauth-client-jetty</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.code.gson</groupId>
                  <artifactId>gson</artifactId>
                  <version>2.7</version>
              </dependency>
              <dependency>
                  <groupId>junit</groupId>
                  <artifactId>junit</artifactId>
                  <version>4.12</version>
                  <scope>test</scope>
              </dependency>
              <dependency>
                  <groupId>com.google.truth</groupId>
                  <artifactId>truth</artifactId>
                  <version>0.29</version>
                  <scope>test</scope>
              </dependency>


          </dependencies>

Для потока данных потребуется JSON-файл GOOGLE_APPLICATION_CREDENTIALS, чтобы выполнить этот код?

Я не нашел таблицы списка методов существующего набора данных с BigQuery IO. Справочный пост

Не могли бы вы мне помочь, пожалуйста? -

Ответы [ 2 ]

0 голосов
/ 29 октября 2018

Это ошибка подключения. Сначала я советую проверить, правильно ли вы устанавливаете свои учетные данные, как объясняется в других ответах, и, когда эта ошибка все еще появляется, выполните следующие действия:

Во-первых, эта ошибка указывает, что сокет TCP был закрыт до того, как сервер смог отправить ответ. Некоторые возможные причины:

  • Сетевое соединение потеряно
  • Сервер решил закрыть соединение
  • Что-то между клиентом и сервером прервало запрос

Подумайте об использовании таких инструментов, как netstat / traceroute / ping, чтобы посмотреть, сможете ли вы найти проблему на маршруте для случаев 1 и 3.

Если сервер является тем, который закрывает соединение, то ваш запрос может быть неверным или он душит вас во избежание перегрузки. Это те случаи, когда быстрые повторные попытки не помогают (в некоторых случаях, если на сервере одновременно возникают проблемы, повторная попытка помогает). Вы можете попробовать экспоненциальную стратегию отсрочки и повторных попыток, и если она не работает, то ваш запрос может быть неверным.

0 голосов
/ 21 июля 2018

Я написал комментарий о Unexpected end of file from server, не уверен, что с этим делать. Но, что касается вашего вопроса GOOGLE_APPLICATION_CREDENTIALS, вам понадобится аккаунт для аутентификации в потоке данных. Я использую свою учетную запись, аутентифицированную через gcloud auth login, но вы также можете использовать служебную учетную запись с gcloud auth activate-service-account --key-file=path/to/keyfile.json. Вы можете создать учетную запись службы через IAM. Подробности здесь .

...