Я пытаюсь прочитать таблицу SQL облака в Java-луче, используя JdbcIO.Read.Я хочу преобразовать каждую строку в Resultset в GenericData.Record, используя метод .withRowMapper (Resultset resultSet).Есть ли способ передать строку схемы JSON в качестве входных данных в метод .withRowMapper, например, ParDo принимает sideInputs как PCollectionView
Я попытался выполнить обе операции чтения (чтение из таблицы information_schema.columns и My Table в одном преобразовании JdbcIO.Read).Тем не менее, я хотел бы сначала создать Schema PCollection, а затем прочитать таблицу с помощью JdbcIO.Read
. Я создаю схему Avro для таблиц на лету следующим образом:
PCollection<String> avroSchema= pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery("SELECT DISTINCT column_name, data_type \n" +
"FROM information_schema.columns\n" +
"WHERE table_name = " + "'" + tableName + "'")
.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
// code here to generate avro schema string
// this works fine for me
}))
Создание PCollectionView, котороебудет хранить мою схему JSON для каждой таблицы.
PCollectionView<String> s = avroSchema.apply(View.<String>asSingleton());
// I want to access this view as side input in next JdbcIO.Read operation
// something like this ;
pipeline.apply(JdbcIO.<String>read()
.withDataSourceConfiguration(config)
.withCoder(StringUtf8Coder.of())
.withQuery(queryString)
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
// access schema here and use it to parse and create
//GenericData.Record from ResultSet fields as per schema
return null;
}
})).
withSideInputs(My PCollectionView here); // this option is not there right now.
Есть ли лучший способ решения этой проблемы?