Существует несоответствие типов. Обратите внимание, что элемент TableRow
анализируется в KV<Integer, String>
in MapElements
(т. Е. Ключ является Integer
). Затем на шаге записи ожидается ключ String
, как в .apply(FileIO.<String, KV<String, String>>writeDynamic()
:
inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
...
Чтобы избежать повторного приведения ключа при использовании .by(KV::getKey)
, я бы порекомендовал привести его как String
before:
inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
В качестве примера я проверил это с общедоступной таблицей bigquery-public-data:london_bicycles.cycle_stations
, где я записываю каждую велосипедную станцию в отдельный файл:
$ cat output/file-746-00000-of-00004.txt
Lots Road, West Chelsea
$ bq query --use_legacy_sql=false "SELECT name FROM \`bigquery-public-data.london_bicycles.cycle_stations\` WHERE id = 746"
Waiting on bqjob_<ID> ... (0s) Current status: DONE
+-------------------------+
| name |
+-------------------------+
| Lots Road, West Chelsea |
+-------------------------+
Полный код:
package com.dataflow.samples;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
public abstract class DynamicGCSWrites {
public interface Options extends PipelineOptions {
@Validation.Required
@Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
String getOutput();
void setOutput(String s);
}
public static void main(String[] args) {
DynamicGCSWrites.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicGCSWrites.Options.class);
Pipeline p = Pipeline.create(options);
String output = options.getOutput();
PCollection<TableRow> inputRows = p
.apply(BigQueryIO.readTableRows()
.from("bigquery-public-data:london_bicycles.cycle_stations")
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("id", "name")));
inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(output)
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
p.run().waitUntilFinish();
}
}