Как получить поток данных Google для записи в имя таблицы BigQuery из входных данных? - PullRequest
0 голосов
/ 11 июня 2019

Я новичок в Dataflow / Beam. Я пытаюсь записать некоторые данные в BigQuery. Я хочу, чтобы имя таблицы назначения было перенесено с предыдущего этапа записи карты с ключом "таблица". Но я не смог выяснить, как передать это имя таблицы по конвейеру в BigQuery. Вот где я застрял .. есть идеи, что делать дальше?

pipeline
// ...
//////// I guess I shouldn't output TableRow here?
.apply("ToBQRow", ParDo.of(new DoFn<Map<String, String>, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        ////////// WHAT DO I DO WITH "table"?
        String table = c.element().get("table");
        TableRow row = new TableRow();
        // ... set some records
        c.output(row);
    }
}))
.apply(BigQueryIO.writeTableRows().to(/* ///// WHAT DO I WRITE HERE?? */)
    .withSchema(schema)
    .withWriteDisposition(
        BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
));

1 Ответ

1 голос
/ 13 июня 2019

Для этого можно использовать DynamicDestination .

В качестве примера я создаю фиктивные данные и в качестве ключа буду использовать последнее слово:

p.apply("Create Data", Create.of("this should go to table one",
                                 "I would like to go to table one",
                                 "please, table one",
                                 "I prefer table two",
                                 "Back to one",
                                 "My fave is one",
                                 "Rooting for two"))
.apply("Create Keys", ParDo.of(new DoFn<String, KV<String,String>>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      String[] splitBySpaces = c.element().split(" ");
      c.output(KV.of(splitBySpaces[splitBySpaces.length - 1],c.element()));
    }
  }))

и затем с помощью getDestination мы контролируем, как направить каждый элемент в отдельную таблицу в соответствии с ключом, и getTable, чтобы построить полное имя таблицы (с префиксом). Мы могли бы использовать getSchema, если разные таблицы имели разные схемы. Наконец, мы контролируем, что писать в таблице, используя withFormatFunction:

.apply(BigQueryIO.<KV<String, String>>write()
.to(new DynamicDestinations<KV<String, String>, String>() {
    public String getDestination(ValueInSingleWindow<KV<String, String>> element) {
        return element.getValue().getKey();
    }
    public TableDestination getTable(String name) {
      String tableSpec = output + name;
        return new TableDestination(tableSpec, "Table for type " + name);
  }
    public TableSchema getSchema(String schema) {
          List<TableFieldSchema> fields = new ArrayList<>();

      fields.add(new TableFieldSchema().setName("Text").setType("STRING"));
      TableSchema ts = new TableSchema();
      ts.setFields(fields);
      return ts;
    }
})
.withFormatFunction(new SerializableFunction<KV<String, String>, TableRow>() {
    public TableRow apply(KV<String, String> row) {
    TableRow tr = new TableRow();

    tr.set("Text", row.getValue());
    return tr;
    }
 })
 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Чтобы полностью проверить это, я создал следующие таблицы:

bq mk dynamic_key
bq mk -f dynamic_key.dynamic_one Text:STRING
bq mk -f dynamic_key.dynamic_two Text:STRING

И после задания переменных $PROJECT, $BUCKET и $TABLE_PREFIX (в моем случае PROJECT_ID:dynamic_key.dynamic_) я запускаю задание с:

mvn -Pdataflow-runner compile -e exec:java \
 -Dexec.mainClass=com.dataflow.samples.DynamicTableFromKey \
      -Dexec.args="--project=$PROJECT \
      --stagingLocation=gs://$BUCKET/staging/ \
      --tempLocation=gs://$BUCKET/temp/ \
      --output=$TABLE_PREFIX \
      --runner=DataflowRunner"

Мы можем убедиться, что каждый элемент попал в правильную таблицу:

$ bq query "SELECT * FROM dynamic_key.dynamic_one"
+---------------------------------+
|              Text               |
+---------------------------------+
| please, table one               |
| Back to one                     |
| My fave is one                  |
| this should go to table one     |
| I would like to go to table one |
+---------------------------------+
$ bq query "SELECT * FROM dynamic_key.dynamic_two"
+--------------------+
|        Text        |
+--------------------+
| I prefer table two |
| Rooting for two    |
+--------------------+

Полный код здесь .

...