Как передать боковые входы / дополнительный ввод в JdbcIO RowMapper Java - PullRequest
0 голосов
/ 19 сентября 2019

Я пытаюсь прочитать таблицу SQL облака в Java-луче, используя JdbcIO.Read.Я хочу преобразовать каждую строку в Resultset в GenericData.Record, используя метод .withRowMapper (Resultset resultSet).Есть ли способ передать строку схемы JSON в качестве входных данных в метод .withRowMapper, например, ParDo принимает sideInputs как PCollectionView

Я попытался выполнить обе операции чтения (чтение из таблицы information_schema.columns и My Table в одном преобразовании JdbcIO.Read).Тем не менее, я хотел бы сначала создать Schema PCollection, а затем прочитать таблицу с помощью JdbcIO.Read

. Я создаю схему Avro для таблиц на лету следующим образом:

PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()
                .withDataSourceConfiguration(config)
                .withCoder(StringUtf8Coder.of())
                .withQuery("SELECT DISTINCT column_name, data_type \n" +
                        "FROM information_schema.columns\n" +
                        "WHERE table_name = " + "'" + tableName + "'")
                .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
            // code here to generate avro schema string
           // this works fine for me

}))

Создание PCollectionView, котороебудет хранить мою схему JSON для каждой таблицы.

 PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());

// I want to access this view as side input in next JdbcIO.Read operation
// something like this ;

pipeline.apply(JdbcIO.<String>read()
        .withDataSourceConfiguration(config)
        .withCoder(StringUtf8Coder.of())
        .withQuery(queryString)
        .withRowMapper(new JdbcIO.RowMapper<String>() {

            @Override
            public String mapRow(ResultSet resultSet) throws Exception {
                // access schema here and use it to parse and create 
               //GenericData.Record from ResultSet fields as per schema

                return null;
            }
        })).

    withSideInputs(My PCollectionView here); // this option is not there right now.

Есть ли лучший способ решения этой проблемы?

1 Ответ

0 голосов
/ 19 сентября 2019

На этом этапе IOs API не принимает SideInputs.

Должно быть возможным добавить ParDo сразу после чтения и выполнить отображение там.Это ParDo может принимать боковые входы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...