Для этого можно использовать DynamicDestination .
В качестве примера я создаю фиктивные данные и в качестве ключа буду использовать последнее слово:
p.apply("Create Data", Create.of("this should go to table one",
"I would like to go to table one",
"please, table one",
"I prefer table two",
"Back to one",
"My fave is one",
"Rooting for two"))
.apply("Create Keys", ParDo.of(new DoFn<String, KV<String,String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] splitBySpaces = c.element().split(" ");
c.output(KV.of(splitBySpaces[splitBySpaces.length - 1],c.element()));
}
}))
и затем с помощью getDestination
мы контролируем, как направить каждый элемент в отдельную таблицу в соответствии с ключом, и getTable
, чтобы построить полное имя таблицы (с префиксом). Мы могли бы использовать getSchema
, если разные таблицы имели разные схемы. Наконец, мы контролируем, что писать в таблице, используя withFormatFunction
:
.apply(BigQueryIO.<KV<String, String>>write()
.to(new DynamicDestinations<KV<String, String>, String>() {
public String getDestination(ValueInSingleWindow<KV<String, String>> element) {
return element.getValue().getKey();
}
public TableDestination getTable(String name) {
String tableSpec = output + name;
return new TableDestination(tableSpec, "Table for type " + name);
}
public TableSchema getSchema(String schema) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("Text").setType("STRING"));
TableSchema ts = new TableSchema();
ts.setFields(fields);
return ts;
}
})
.withFormatFunction(new SerializableFunction<KV<String, String>, TableRow>() {
public TableRow apply(KV<String, String> row) {
TableRow tr = new TableRow();
tr.set("Text", row.getValue());
return tr;
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Чтобы полностью проверить это, я создал следующие таблицы:
bq mk dynamic_key
bq mk -f dynamic_key.dynamic_one Text:STRING
bq mk -f dynamic_key.dynamic_two Text:STRING
И после задания переменных $PROJECT
, $BUCKET
и $TABLE_PREFIX
(в моем случае PROJECT_ID:dynamic_key.dynamic_
) я запускаю задание с:
mvn -Pdataflow-runner compile -e exec:java \
-Dexec.mainClass=com.dataflow.samples.DynamicTableFromKey \
-Dexec.args="--project=$PROJECT \
--stagingLocation=gs://$BUCKET/staging/ \
--tempLocation=gs://$BUCKET/temp/ \
--output=$TABLE_PREFIX \
--runner=DataflowRunner"
Мы можем убедиться, что каждый элемент попал в правильную таблицу:
$ bq query "SELECT * FROM dynamic_key.dynamic_one"
+---------------------------------+
| Text |
+---------------------------------+
| please, table one |
| Back to one |
| My fave is one |
| this should go to table one |
| I would like to go to table one |
+---------------------------------+
$ bq query "SELECT * FROM dynamic_key.dynamic_two"
+--------------------+
| Text |
+--------------------+
| I prefer table two |
| Rooting for two |
+--------------------+
Полный код здесь .