Как передать переменную в функцию ParDo из конвейера в задание Dataflow. Ниже приведен пример, и я пытаюсь получить fileDate перед созданием конвейера и хочу передать его в функцию ParDo. У меня есть переменная, объявленная в интерфейсе
public interface CsvToBq extends DataflowPipelineOptions {
@Description("File Date")
String getFileDate();
void setFileDate(String value);
}
Я задаю значение в задании как
public static void main(String[] args) {
PipelineOptionsFactory.register(CsvToBq.class);
CsvToBq options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(CsvToBq.class);
Date date = new Date();
String fileDate = formatter.format(date);
options.setFileDate(fileDate);
, и я обращаюсь к переменной в функции ParDo как
private static class WikiParDo extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
String fileDate = options.getFileDate();
String[] split = c.element().split(",");
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
row.set("file_date", fileDate);
c.output(row);
}
}
Вот полный код
public class CsvToBq {
public static void main(String[] args) {
PipelineOptionsFactory.register(CsvToBq.class);
CsvToBq options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(CsvToBq.class);
Date date = new Date();
String fileDate = formatter.format(date);
options.setFileDate(fileDate);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("READ", TextIO.read().from("gs://bucket/file.csv"))
.apply("TRANSFORM", ParDo.of(new WikiParDo()))
.apply("WRITE", BigQueryIO.writeTableRows()
.to(String.format("%s:dataset_name.wiki_demo", options.getProject()))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_TRUNCATE)
.withSchema(getTableSchema()));
pipeline.run();
}
private static TableSchema getTableSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
fields.add(new TableFieldSchema().setName("language").setType("STRING"));
fields.add(new TableFieldSchema().setName("title").setType("STRING"));
fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("file_date").setType("STRING"));
return new TableSchema().setFields(fields);
}
public interface CsvToBq extends DataflowPipelineOptions {
@Description("File Date")
String getFileDate();
void setFileDate(String value);
}
private static class WikiParDo extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
String fileDate = options.getFileDate();
String[] split = c.element().split(",");
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
row.set("file_date", fileDate);
c.output(row);
}
}
}
Но это не работает. Я попытался использовать StaticValueProvider и sideinputs, но похоже, что он не служит цели.