Передача переменных в ParDo вне конвейера в потоке данных - PullRequest
0 голосов
/ 23 марта 2020

Как передать переменную в функцию ParDo из конвейера в задание Dataflow. Ниже приведен пример, и я пытаюсь получить fileDate перед созданием конвейера и хочу передать его в функцию ParDo. У меня есть переменная, объявленная в интерфейсе

    public interface CsvToBq extends DataflowPipelineOptions {
    @Description("File Date")
    String getFileDate();

    void setFileDate(String value);
}

Я задаю значение в задании как

 public static void main(String[] args) {
    PipelineOptionsFactory.register(CsvToBq.class);
    CsvToBq options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(CsvToBq.class);

    Date date = new Date();     
    String fileDate = formatter.format(date);
    options.setFileDate(fileDate);

, и я обращаюсь к переменной в функции ParDo как

 private static class WikiParDo extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        PipelineOptions options = c.getPipelineOptions();
        String fileDate = options.getFileDate();
        String[] split = c.element().split(",");
        TableRow row = new TableRow();
        for (int i = 0; i < split.length; i++) {
            TableFieldSchema col = getTableSchema().getFields().get(i);
            row.set(col.getName(), split[i]);
        }
        row.set("file_date", fileDate);
        c.output(row);
    }
}

Вот полный код

public class CsvToBq {
public static void main(String[] args) {
    PipelineOptionsFactory.register(CsvToBq.class);
    CsvToBq options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(CsvToBq.class);

    Date date = new Date();     
    String fileDate = formatter.format(date);
    options.setFileDate(fileDate);

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("READ", TextIO.read().from("gs://bucket/file.csv"))
            .apply("TRANSFORM", ParDo.of(new WikiParDo()))
            .apply("WRITE", BigQueryIO.writeTableRows()
                    .to(String.format("%s:dataset_name.wiki_demo", options.getProject()))
                    .withCreateDisposition(CREATE_IF_NEEDED)
                    .withWriteDisposition(WRITE_TRUNCATE)
                    .withSchema(getTableSchema()));
    pipeline.run();
}

private static TableSchema getTableSchema() {
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
    fields.add(new TableFieldSchema().setName("language").setType("STRING"));
    fields.add(new TableFieldSchema().setName("title").setType("STRING"));
    fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("file_date").setType("STRING"));
    return new TableSchema().setFields(fields);
}

public interface CsvToBq extends DataflowPipelineOptions {
    @Description("File Date")
    String getFileDate();

    void setFileDate(String value);
}

private static class WikiParDo extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        PipelineOptions options = c.getPipelineOptions();
        String fileDate = options.getFileDate();

        String[] split = c.element().split(",");
        TableRow row = new TableRow();
        for (int i = 0; i < split.length; i++) {
            TableFieldSchema col = getTableSchema().getFields().get(i);
            row.set(col.getName(), split[i]);
        }
        row.set("file_date", fileDate);
        c.output(row);
    }
}

}

Но это не работает. Я попытался использовать StaticValueProvider и sideinputs, но похоже, что он не служит цели.

Ответы [ 2 ]

1 голос
/ 23 марта 2020

Я думаю, вам понадобится что-то вроде:

    CsvToBq options = c.getPipelineOptions().as(CsvToBq.class);
    String fileDate = options.getFileDate();

Также, если вы не планируете использовать ValueProviders ( текущее требование для передачи параметров в шаблоны потока данных ), вы можете также сделайте что-то вроде этого:

private static class WikiParDo extends DoFn<String, TableRow> {
String fileName;
  public WikiParDo(String fileName) {
    this.fileName = fileName;
  }

Обратите внимание, что то, что вы храните, должно быть сертизуемым. joda.time Мгновенные объекты, если я помню, в порядке.

0 голосов
/ 25 марта 2020

Я сделал это, используя StaticValueProvider. Подробнее об этом можно узнать здесь

Нам нужно определить интерфейс, который расширяет опции, как показано ниже

    @Description("File Date")
    ValueProvider<String[]> getFileDate();
    void setFileDate(ValueProvider<String[]> value);

В методе main мы можем установить переменную как

    Date date = new Date();
    String fileDate = formatter.format(date);

и может быть передано в конвейерном приложении как

ParDo.of(new WikiParDo(StaticValueProvider.of(fileDate)))

, а в функции ParDo мы можем использовать, как показано ниже

    ValueProvider<String[]> fDate;
    StoreFrontBqConv(ValueProvider<String[]> fileDate) {
      this.fDate = fileDate;
    }

    fDate.get()

Вот полный код

public class CsvToBq {

public interface CsvToBq extends DataflowPipelineOptions {
    @Description("File Date")
    ValueProvider<String[]> getFileDate();
    void setFileDate(ValueProvider<String[]> value);
}

private static class WikiParDo extends DoFn<String, TableRow> {

    ValueProvider<String[]> fDate;
    StoreFrontBqConv(ValueProvider<String[]> fileDate) {
      this.fDate = fileDate;
    }

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        PipelineOptions options = c.getPipelineOptions();

        String[] split = c.element().split(",");
        TableRow row = new TableRow();
        for (int i = 0; i < split.length; i++) {
            TableFieldSchema col = getTableSchema().getFields().get(i);
            row.set(col.getName(), split[i]);
        }
        row.set("file_date", fDate.get());
        c.output(row);
    }
}

private static TableSchema getTableSchema() {
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
    fields.add(new TableFieldSchema().setName("language").setType("STRING"));
    fields.add(new TableFieldSchema().setName("title").setType("STRING"));
    fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("file_date").setType("STRING"));
    return new TableSchema().setFields(fields);
}

public static void main(String[] args) {
    PipelineOptionsFactory.register(CsvToBq.class);
    CsvToBq options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToBq.class);

    Date date = new Date();
    String fileDate = formatter.format(date);

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("READ", TextIO.read().from("gs://bucket/file.csv"))
            .apply("TRANSFORM", ParDo.of(new WikiParDo(StaticValueProvider.of(fileDate)))).apply("WRITE",
                    BigQueryIO.writeTableRows().to(String.format("%s:dataset_name.wiki_demo", options.getProject()))
                            .withCreateDisposition(CREATE_IF_NEEDED).withWriteDisposition(WRITE_TRUNCATE)
                            .withSchema(getTableSchema()));
    pipeline.run();
}

}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...