GCloud Dataflow воссоздает таблицу BigQuery, если она удаляется во время выполнения задания - PullRequest
0 голосов
/ 21 марта 2020

Я настроил конвейер потока данных GCloud, который принимает сообщения из подписки Pub / Sub, преобразует их в строки таблицы и записывает эти строки в соответствующую таблицу BigQuery.

Назначение таблицы определяется на основе содержимого сообщения Pub / Sub и иногда приводит к ситуации, когда таблица еще не существует и должна быть создана в первую очередь. Для этого я использую create disposition CREATE_IF_NEEDED, который прекрасно работает.

Однако я заметил, что если я вручную удалю только что созданную таблицу в BigQuery, когда задание Dataflow еще выполняется, Dataflow застрянет и будет зависать. не воссоздавать таблицу. Вместо этого я получаю сообщение об ошибке:

Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
    java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
    java.util.concurrent.FutureTask.get(FutureTask.java:191) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
    org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
    org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)

Если я go вернусь в BigQuery и заново создаю эту таблицу вручную, задание потока данных продолжит работать.

Однако мне интересно, есть ли способ проинструктировать конвейер потока данных для воссоздать таблицу, если она будет удалена во время выполнения задания ?

GCloud Dataflow pipeline

1 Ответ

1 голос
/ 22 марта 2020

Это невозможно в текущем BigqueryIO разъеме. Из github-ссылки присутствующего коннектора здесь вы увидите, что для StreamingWriteFn, каков ваш код, процесс создания таблицы выполняется в getOrCreateTable, и это называется в finishBundle. Существует карта createdTables, которая поддерживается, и в finishBundle таблица создается, если ее еще нет, после того, как она присутствует и сохраняется в хэш-карте, она не создается заново, как показано ниже: -

    public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
        throws IOException {
      TableReference tableReference = parseTableSpec(tableSpec);
      if (!createdTables.contains(tableSpec)) {
        synchronized (createdTables) {
          // Another thread may have succeeded in creating the table in the meanwhile, so
          // check again. This check isn't needed for correctness, but we add it to prevent
          // every thread from attempting a create and overwhelming our BigQuery quota.
          if (!createdTables.contains(tableSpec)) {
            TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
            Bigquery client = Transport.newBigQueryClient(options).build();
            BigQueryTableInserter inserter = new BigQueryTableInserter(client);
            inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
                CreateDisposition.CREATE_IF_NEEDED, tableSchema);
            createdTables.add(tableSpec);
          }
        }
      }
      return tableReference;
    }

Для того, чтобы удовлетворить ваши требования, вам, возможно, придется иметь свой собственный BigqueryIO, в котором вы не выполняете эту конкретную c проверку

if (!createdTables.contains(tableSpec)) {

Однако более важный вопрос заключается в том, почему таблица удаляется в производственной системе сама? Эта проблема должна быть исправлена, а не пытаться заново создать таблицу из потока данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...