Мой случай был таким: Моя компания создала несколько таблиц в своей базе данных. У каждой таблицы есть один столбец. Я хочу составить «сводную таблицу», в которой будут собраны данные из каждой таблицы.
Я пробовал использовать приведенный ниже код. но это делает данные слишком сложными вложенными. Результат данных становится таким: PCollection<PCollection<TableRow>>
Мой основной код
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> tableName = pipeline.apply(
"GET NLU TABLE NAME",
JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
options.getDriverClassName(),
options.getConnectionURL())
.withUsername(options.getUsername())
.withPassword(options.getPassword()))
.withQuery("SELECT id from nlus")
.withCoder(StringUtf8Coder.of())
.withRowMapper(resultSet -> resultSet.getString(1))
);
tableName.apply(new TableData(options, pipeline))
.apply(Flatten.<TableRow>pCollections()); // trying to Flatten it but doesn't work.
Мой расширенный код PTransform:
public static class TableData extends PTransform<PCollection<String>, PCollection<PCollection<TableRow>>> {
private Options options;
private Pipeline pipeline;
public TableData(Options options, Pipeline pipeline) {
super();
this.options = options;
this.pipeline = pipeline;
}
@Override
public PCollection<PCollection<TableRow>> expand(PCollection<String> tables) {
// TODO: SELECT DATA per table,
PCollection<PCollection<TableRow>> data = tables.apply(
"GET NLU DATA",
ParDo.of(new DoFn<String, PCollection<TableRow>>() {
@ProcessElement
public void processElement(@Element String tableName, ProcessContext receiver) {
PCollection<TableRow> row = pipeline.apply(JdbcIO.<TableRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
options.getDriverClassName(),
options.getConnectionURL())
.withUsername(options.getUsername())
.withPassword(options.getPassword()))
.withQuery("SELECT * FROM" + tableName)
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()));
receiver.output(row);
}
})
);
return data;
}
}
Я просто хочу, чтобы мои данные стали проще, какPCollection<TableRow>
или, возможно, другой простой способ. Как мне этого добиться?