Самый простой способ уменьшить вложенные PCollection TableRow - PullRequest
0 голосов
/ 01 ноября 2019

Мой случай был таким: Моя компания создала несколько таблиц в своей базе данных. У каждой таблицы есть один столбец. Я хочу составить «сводную таблицу», в которой будут собраны данные из каждой таблицы.

Я пробовал использовать приведенный ниже код. но это делает данные слишком сложными вложенными. Результат данных становится таким: PCollection<PCollection<TableRow>>

Мой основной код

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
            Pipeline pipeline = Pipeline.create(options);

            PCollection<String> tableName = pipeline.apply(
                    "GET NLU TABLE NAME",
                    JdbcIO.<String>read()
                            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                                    options.getDriverClassName(),
                                    options.getConnectionURL())
                                    .withUsername(options.getUsername())
                                    .withPassword(options.getPassword()))
                            .withQuery("SELECT id from nlus")
                            .withCoder(StringUtf8Coder.of())
                            .withRowMapper(resultSet -> resultSet.getString(1))

            );

            tableName.apply(new TableData(options, pipeline))
            .apply(Flatten.<TableRow>pCollections()); // trying to Flatten it but doesn't work.

Мой расширенный код PTransform:

public static class TableData extends PTransform<PCollection<String>, PCollection<PCollection<TableRow>>> {
        private Options options;
        private Pipeline pipeline;

        public TableData(Options options, Pipeline pipeline) {
            super();
            this.options = options;
            this.pipeline = pipeline;
        }

        @Override
        public PCollection<PCollection<TableRow>> expand(PCollection<String> tables) {
            // TODO: SELECT DATA per table,
            PCollection<PCollection<TableRow>> data = tables.apply(
                    "GET NLU DATA",
                    ParDo.of(new DoFn<String, PCollection<TableRow>>() {
                        @ProcessElement
                        public void processElement(@Element String tableName, ProcessContext receiver) {

                            PCollection<TableRow> row = pipeline.apply(JdbcIO.<TableRow>read()
                                    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                                            options.getDriverClassName(),
                                            options.getConnectionURL())
                                            .withUsername(options.getUsername())
                                            .withPassword(options.getPassword()))
                                    .withQuery("SELECT * FROM" + tableName)
                                    .withCoder(TableRowJsonCoder.of())
                                    .withRowMapper(JdbcConverters.getResultSetToTableRow()));

                            receiver.output(row);
                        }
                    })
            );


            return data;
        }
    }

Я просто хочу, чтобы мои данные стали проще, какPCollection<TableRow> или, возможно, другой простой способ. Как мне этого добиться?

...