Перезаписать некоторые разделы многораздельной таблицы Bigquery - PullRequest
0 голосов
/ 12 июня 2018

В настоящее время я пытаюсь разработать конвейер потока данных для замены некоторых разделов многораздельной таблицы.У меня есть настраиваемое поле раздела, которое является датой.Ввод моего конвейера - это файл с потенциально разными датами.

Я разработал конвейер:

    PipelineOptionsFactory.register(BigQueryOptions.class);
    BigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);

    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> rows =  p.apply("ReadLines", TextIO.read().from(options.getFileLocation()))
            .apply("Convert To BQ Row", ParDo.of(new StringToRowConverter(options)));



    ValueProvider<String>  projectId = options.getProjectId();
    ValueProvider<String> datasetId = options.getDatasetId();
    ValueProvider<String> tableId = options.getTableId();
    ValueProvider<String> partitionField = options.getPartitionField();
    ValueProvider<String> columnNames = options.getColumnNames();
    ValueProvider<String> types = options.getTypes();

    rows.apply("Write to BQ", BigQueryIO.writeTableRows()
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withCustomGcsTempLocation(options.getGCSTempLocation())
            .to(new DynamicDestinations<TableRow, String>() {

                @Override
                public String getDestination(ValueInSingleWindow<TableRow> element) {

                    TableRow date = element.getValue();

                    String partitionDestination = (String) date.get(partitionField.get());

                    SimpleDateFormat from = new SimpleDateFormat("yyyy-MM-dd");
                    SimpleDateFormat to = new SimpleDateFormat("yyyyMMdd");

                    try {

                        partitionDestination = to.format(from.parse(partitionDestination));
                        LOG.info("Table destination "+partitionDestination);
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"$"+partitionDestination;

                    } catch(ParseException e){
                        e.printStackTrace();
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"_rowsWithErrors";
                    }
                }

                @Override
                public TableDestination getTable(String destination) {

                    TimePartitioning timePartitioning = new TimePartitioning();
                    timePartitioning.setField(partitionField.get());
                    timePartitioning.setType("DAY");
                    timePartitioning.setRequirePartitionFilter(true);

                    TableDestination tableDestination  = new TableDestination(destination, null, timePartitioning);

                    LOG.info(tableDestination.toString());

                    return tableDestination;

                }

                @Override
                public TableSchema getSchema(String destination) {

                        return new TableSchema().setFields(buildTableSchemaFromOptions(columnNames, types));
                }
            })
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    p.run();
}

Когда я запускаю конвейер локально, он успешно заменяет разделы, дата которых находится во входном файле.Тем не менее, при развертывании в облачном потоке данных Google и запуске шаблона с точно такими же параметрами он усекает все данные, и в конце у меня просто есть файл, который я хотел загрузить в свою таблицу.

Знаете ли выпочему такая разница?

Спасибо!

1 Ответ

0 голосов
/ 11 июля 2018

Вы указали BigQueryIO.Write.CreateDisposition для CREATE_IF_NEEDED, и это связано с BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE, поэтому даже если таблица существует, она может быть воссоздана.Это причина, по которой вы видите замену вашей таблицы.

Подробнее см. В этом документе [1].

[1] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write.CreateDisposition#CREATE_IF_NEEDED

...