Apache Beam: запись значений ключа, пары значений в файлы в соответствии с ключом - PullRequest
0 голосов
/ 06 ноября 2019

Я хочу записать значения из ключа, пары значений в текстовые файлы в GCS по ключу, используя FileIO с writeDynamic() в Apache Beam (используя Java).

Пока что ячтение данных из Big Query, преобразование их в ключ, пары значений и затем попытка использовать FileIO с writeDynamic() для записи значений в один файл на ключ.

PCollection<TableRow> inputRows = p.apply(BigQueryIO.readTableRows()
    .from(tableSpec)
    .withMethod(Method.DIRECT_READ)
    .withSelectedFields(Lists.newArrayList("id", "string1", "string2", "string3", "int1")));

inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
    .via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
    .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to("gs://bucket/output")
    .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

Я получаю ошибка:

The method apply
  (PTransform<? super PCollection<KV<Integer,String>>,OutputT>) 
  in the type PCollection<KV<Integer,String>> 
  is not applicable for the arguments 
  (FileIO.Write<String,KV<String,String>>)

1 Ответ

0 голосов
/ 07 ноября 2019

Существует несоответствие типов. Обратите внимание, что элемент TableRow анализируется в KV<Integer, String> in MapElements (т. Е. Ключ является Integer). Затем на шаге записи ожидается ключ String, как в .apply(FileIO.<String, KV<String, String>>writeDynamic():

inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
    .via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
    .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    ...

Чтобы избежать повторного приведения ключа при использовании .by(KV::getKey), я бы порекомендовал привести его как String before:

inputRows
    .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
        .via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
    .apply(FileIO.<String, KV<String, String>>writeDynamic()
        .by(KV::getKey)

В качестве примера я проверил это с общедоступной таблицей bigquery-public-data:london_bicycles.cycle_stations, где я записываю каждую велосипедную станцию ​​в отдельный файл:

$ cat output/file-746-00000-of-00004.txt 
Lots Road, West Chelsea

$ bq query --use_legacy_sql=false "SELECT name FROM \`bigquery-public-data.london_bicycles.cycle_stations\` WHERE id = 746"
Waiting on bqjob_<ID> ... (0s) Current status: DONE   
+-------------------------+
|          name           |
+-------------------------+
| Lots Road, West Chelsea |
+-------------------------+

Полный код:

package com.dataflow.samples;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;


public abstract class DynamicGCSWrites {

    public interface Options extends PipelineOptions {
        @Validation.Required
        @Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
        String getOutput();
        void setOutput(String s);
    }

    public static void main(String[] args) {

        DynamicGCSWrites.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicGCSWrites.Options.class);

        Pipeline p = Pipeline.create(options);

        String output = options.getOutput();

        PCollection<TableRow> inputRows = p
            .apply(BigQueryIO.readTableRows()
                .from("bigquery-public-data:london_bicycles.cycle_stations")
                .withMethod(Method.DIRECT_READ)
                .withSelectedFields(Lists.newArrayList("id", "name")));

        inputRows
            .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                .via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
            .apply(FileIO.<String, KV<String, String>>writeDynamic()
                .by(KV::getKey)
                .withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.fn(KV::getValue), TextIO.sink())
                .to(output)
                .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

        p.run().waitUntilFinish();
    }
}

...