Google App Engine не может запустить задания Dataflow - PullRequest
0 голосов
/ 22 марта 2019

Вот сообщение об ошибке, которое я распечатываю при сбое. Я использую сервер Dev Local и захожу на http://localhost:8080/dataflow/schedule, чтобы сделать вызов doGet () для запуска конвейера потока данных. Я также использую учетную запись по умолчанию для службы приложений (@ appspot.gserviceaccount.com) для учетных данных.

Вот мой код для начала работы,

@WebServlet(name = "dataflowscheduler", value = "/dataflow/schedule")
public class DataflowSchedulingServlet extends HttpServlet {
    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {


        Properties properties = System.getProperties();


        try{
            String[] args = {
                    "--project=<project_name>",
                    "--runner=DataflowRunner",
                    "--stagingLocation=gs://<project_name>/temp/cronjobs",
                    "--maxNumWorkers=1",
                    "--gcpTempLocation=gs://<project_name>/temp/gcpTempLocation",
                    "--tempLocation=gs://<project_name>/temp/tempLocation",
                    "--driverClassName=org.postgresql.Driver",
                    "--connectionURL=jdbc:postgresql://example.com:port/production",
                    "--username=<username>",
                    "--password=<password>",
                    "--driverJars=gs://<project_name>/temp/postgresql-42.2.5.jar",
                    "--bigQueryLoadingTemporaryDirectory=gs://<project_name>/temp/",
                    "--connectionProperties='unicode=true&characterEncoding=UTF-8'",
                    "--query=SELECT * FROM public.view_relations",
                    "--datasetname=flyhomes_production",
                    "--schemaname=public",
                    "--tablename=view_relations",
                    "--outputTable=<project_name>:<dataset>.view_relations",
                    "--dataflowJobFile=/Users/annasun/GoogleCloudProject/postgres-to-bigquery/out.json"};

            JdbcToBigQuery.main(args);

            System.out.println("STARTJOB() !!! ");

        } catch (InterruptedException e) {
            response.getWriter().println( "Exception: " + Arrays.toString(e.getStackTrace()));
        }

        response.setContentType("text/plain");
        response.getWriter().println("Hello App Engine - Standard using "
             //   + SystemProperty.version.get()
                + " Java " + properties.get("java.specification.version"));

    }

А вот и основная функция,

public static void main(String[] args) throws IOException, InterruptedException {
    System.out.println("HelloWorld()!" );

    // Parse the user options passed from the command-line
    JdbcConverters.JdbcToBigQueryOptions options =
            PipelineOptionsFactory.fromArgs(args)
                    .withValidation()
                    .as(JdbcConverters.JdbcToBigQueryOptions.class);

    String datasetName = options.getDatasetname().toString();
    String jobName = options.getJobName();
    String tableName =  options.getTablename().toString().replace("_", "-");
    jobName = jobName + "-" + tableName;
    options.setJobName(jobName);

        System.out.println("run_updateTable_production(options)");
        run_updateTable_production(options);
        System.out.println("AFTER -- run_updateTable_production(options) ");

}


private static void run_updateTable_production(JdbcConverters.JdbcToBigQueryOptions options)
        throws InterruptedException{

    Timestamp lastUpdatedTime = SchemaCreator.getLastUpdatedTimeFromBigQuery(options);
    System.out.println("LAST UPDATED TIME IS " + lastUpdatedTime);

     if(lastUpdatedTime != null ) {
         System.out.println("!! LAST UPDATED TIME IS " + lastUpdatedTime);

         String query_base = options.getQuery().toString();
         String query_update = query_base + " WHERE updated_at > '" + lastUpdatedTime
                 + "' ORDER BY updated_at, id ";
         String jobName = options.getJobName();
       //  select * from public.listings WHERE updated_at > lastUpdatedTime
       //  ORDER BY updated_at, id OFFSET 100 LIMIT 50

         options.setQuery(ValueProvider.StaticValueProvider.of(query_update));
         System.out.println("QUERY IS : " + options.getQuery());
         options.setJobName(jobName + "-UPDATE-"
                 + lastUpdatedTime.toString().replace(":", "-").replace(".", "-"));
         System.out.println(jobName + "-UPDATE-"
                 + lastUpdatedTime.toString().replace(":", "-").replace(".", "-"));
         run(options);

     } else {
         run_createTable_Recursive(options);
     }

    System.out.println("FINISHED -- run_updateTable_production(options) ");
}



/**
 * Runs the pipeline with the supplied options.
 *
 * @param options The execution parameters to the pipeline.
 * @return The result of the pipeline execution.
 */
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
    System.out.println("BEFORE Pipeline.create!!!!");
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    System.out.println("AFTER Pipeline.create!!!!");

    /*
     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
     *        2) Append TableRow to BigQuery via BigQueryIO
     */
    pipeline
            /*
             * Step 1: Read records via JDBC and convert to TableRow
             *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
             */
            .apply(
                    "Read from JdbcIO",
                    DynamicJdbcIO.<TableRow>read()
                            .withDataSourceConfiguration(
                                    DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                                            options.getDriverClassName(), options.getConnectionURL())
                                            .withUsername(options.getUsername())
                                            .withPassword(options.getPassword())
                                            .withDriverJars(options.getDriverJars())
                                            .withConnectionProperties(options.getConnectionProperties()))
                            .withQuery(options.getQuery())
                            .withCoder(TableRowJsonCoder.of())
                            .withRowMapper(JdbcConverters.getResultSetToTableRow()))
            /*
             * Step 2: Append TableRow to an existing BigQuery table
             */
            .apply(
                    "Write to BigQuery",
                    BigQueryIO.writeTableRows()
                            .withoutValidation()
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                            .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                            .to(options.getOutputTable()));

    System.out.println("AFTER Pipeline.APPLY!!!!");
    // Execute the pipeline and return the result.
    return pipeline.run();
}

Однако я получил ошибку сервера.

HTTP ERROR 500 Проблема с доступом к /dataflow/schedule.

Server Error
Caused by:

    java.lang.RuntimeException: Error while staging packages
            at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:398)
            at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:271)
            at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:80)
            at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:68)
            at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:713)
            at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179)
            at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
            at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
            at com.flyhomes.cloud.dataflow.JdbcToBigQuery.run(JdbcToBigQuery.java:258)
            at com.flyhomes.cloud.dataflow.JdbcToBigQuery.run_updateTable_production(JdbcToBigQuery.java:140)
            at com.flyhomes.cloud.dataflow.JdbcToBigQuery.main(JdbcToBigQuery.java:104)
            at com.flyhomes.cloud.dataflow.DataflowSchedulingServlet.doGet(DataflowSchedulingServlet.java:64)
            at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
            at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
            at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655)
            at com.google.appengine.tools.development.ResponseRewriterFilter.doFilter(ResponseRewriterFilter.java:134)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.HeaderVerificationFilter.doFilter(HeaderVerificationFilter.java:34)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.api.blobstore.dev.ServeBlobFilter.doFilter(ServeBlobFilter.java:63)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.apphosting.utils.servlet.TransactionCleanupFilter.doFilter(TransactionCleanupFilter.java:48)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.jetty9.StaticFileFilter.doFilter(StaticFileFilter.java:123)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.DevAppServerModulesFilter.doDirectRequest(DevAppServerModulesFilter.java:366)
            at com.google.appengine.tools.development.DevAppServerModulesFilter.doDirectModuleRequest(DevAppServerModulesFilter.java:349)
            at com.google.appengine.tools.development.DevAppServerModulesFilter.doFilter(DevAppServerModulesFilter.java:116)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
            at com.google.appengine.tools.development.DevAppServerRequestLogFilter.doFilter(DevAppServerRequestLogFilter.java:44)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634)
            at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)
            at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146)
            at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257)
            at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
            at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
            at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
            at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
            at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
            at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)
            at com.google.appengine.tools.development.jetty9.DevAppEngineWebAppContext.doScope(DevAppEngineWebAppContext.java:94)
            at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
            at com.google.appengine.tools.development.jetty9.JettyContainerService$ApiProxyHandler.handle(JettyContainerService.java:595)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
            at org.eclipse.jetty.server.Server.handle(Server.java:531)
            at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)
            at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
            at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
            at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)
            at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
            at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:132)
            at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
            at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
            at com.google.appengine.tools.development.RequestEndListenerHelper.getListeners(RequestEndListenerHelper.java:52)
            at com.google.appengine.tools.development.RequestEndListenerHelper.register(RequestEndListenerHelper.java:39)
            at com.google.appengine.tools.development.RequestThreadFactory$1$1.start(RequestThreadFactory.java:65)
            at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
            at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
            at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:530)
            at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
            at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
            at org.apache.beam.sdk.util.MoreFutures.supplyAsync(MoreFutures.java:101)
            at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackage(PackageUtil.java:170)
            at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stageClasspathElements$2(PackageUtil.java:359)
            at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
            at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
            at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
            at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
            at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    Caused by:
    java.lang.NullPointerException
            at com.google.appengine.tools.development.RequestEndListenerHelper.getListeners(RequestEndListenerHelper.java:52)
            at com.google.appengine.tools.development.RequestEndListenerHelper.register(RequestEndListenerHelper.java:39)
            at com.google.appengine.tools.development.RequestThreadFactory$1$1.start(RequestThreadFactory.java:65)
            at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
            at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
            at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:530)
            at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
            at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
            at org.apache.beam.sdk.util.MoreFutures.supplyAsync(MoreFutures.java:101)
            at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackage(PackageUtil.java:170)
            at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stageClasspathElements$2(PackageUtil.java:359)
            at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
            at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
            at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
            at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
            at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
            at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

и сообщение об ошибке в журнале,

{
 "error": {
  "errors": [
   {
    "domain": "global",
    "reason": "required",
    "message": "Anonymous caller does not have storage.buckets.list access to project <project_number>.",
    "locationType": "header",
    "location": "Authorization"
   }
  ],
  "code": 401,
  "message": "Anonymous caller does not have storage.buckets.list access to project <project_number>."
 }
}

1 Ответ

1 голос
/ 30 марта 2019

Ошибка, которую вы видите.Ваш экземпляр ядра приложения должен работать от имени определенного пользователя или « Сервисная учетная запись ».Вам необходимо включить разрешения для storage.buckets.list для этой учетной записи.Скорее всего, используемая учетная запись службы по умолчанию не имеет таких разрешений, инструкции можно найти здесь .Я также рекомендую сначала запустить конвейер вне ядра приложения и убедиться, что он работает успешно.Также может быть проще создать шаблонный конвейер и запустить его таким образом.

Кроме того, к вашему сведению, возможно, стоит также дважды проверить все в этом посте:

Этот блог показывает, какчтобы запустить шаблонное задание DF, сделайте это как задание cron, вам просто нужно будет запустить его из RPC.Эти инструкции должны по крайней мере помочь с большей частью установки.

https://amygdala.github.io/dataflow/app_engine/2017/10/24/gae_dataflow.html

...