Это невозможно в текущем BigqueryIO
разъеме. Из github-ссылки присутствующего коннектора здесь вы увидите, что для StreamingWriteFn
, каков ваш код, процесс создания таблицы выполняется в getOrCreateTable
, и это называется в finishBundle
. Существует карта createdTables
, которая поддерживается, и в finishBundle
таблица создается, если ее еще нет, после того, как она присутствует и сохраняется в хэш-карте, она не создается заново, как показано ниже: -
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
TableReference tableReference = parseTableSpec(tableSpec);
if (!createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
if (!createdTables.contains(tableSpec)) {
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_IF_NEEDED, tableSchema);
createdTables.add(tableSpec);
}
}
}
return tableReference;
}
Для того, чтобы удовлетворить ваши требования, вам, возможно, придется иметь свой собственный BigqueryIO, в котором вы не выполняете эту конкретную c проверку
if (!createdTables.contains(tableSpec)) {
Однако более важный вопрос заключается в том, почему таблица удаляется в производственной системе сама? Эта проблема должна быть исправлена, а не пытаться заново создать таблицу из потока данных.