как передать динамические параметры в конвейере потока данных облака Google - PullRequest
0 голосов
/ 06 июня 2018

Я написал код для ввода CSV-файла из GCS в BigQuery с жестко заданным ProjectID, набором данных, именем таблицы, GCS Temp & Staging location.

Я ищу код, который должен читать

  • ProjectID
  • Набор данных
  • Имя таблицы
  • Параметры местоположения GCS Temp & Staging

из BigQuery table(Dynamic parameters).

Код: -

public class DemoPipeline {

public static TableReference getGCDSTableReference() {
    TableReference ref = new TableReference();
    ref.setProjectId("myprojectbq");
    ref.setDatasetId("DS_Emp");
    ref.setTableId("emp");
    return ref;
}
static class TransformToTable extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {

        String input = c.element();

        String[] s = input.split(",");
        TableRow row = new TableRow();

        row.set("id", s[0]);
        row.set("name", s[1]);
        c.output(row);

    }
}
public interface MyOptions extends PipelineOptions {

    /*
     * Param
     * 
     */

}

public static void main(String[] args) {

    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
    options.setTempLocation("gs://demo-xxxxxx/temp");
    Pipeline p = Pipeline.create(options);

    PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-xxxxxx/student.csv"));

    PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));

    rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())
            //.withSchema(BQTableSemantics.getGCDSTableSchema())
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

    p.run();
}
}

1 Ответ

0 голосов
/ 19 сентября 2018

Даже для чтения из исходной таблицы (ИД проекта / набора данных / имен таблиц), где содержатся другие данные, вам нужно где-то жестко закодировать такую ​​информацию.Файлы свойств, рекомендованные Харисом, являются хорошим подходом, обратите внимание на следующие предложения:

  1. Файл свойств Java .Используется, когда параметры должны быть изменены или настроены.В общем, изменения, которые не требуют новой компиляции.Это файл, который должен жить или прикрепляться к вашим Java-классам.Чтение этого файла из GCS возможно, но странный вариант.

  2. Параметры выполнения конвейера.Пользовательские параметры могут быть в качестве обходного пути для вашего вопроса, пожалуйста, отметьте Создание пользовательских параметров , чтобы понять, как это можно сделать, Вот небольшой пример .

...